From 18526f8b45ca1edfa57e824aec1300becce80c83 Mon Sep 17 00:00:00 2001 From: Carsten Kvist Date: Thu, 2 Apr 2026 23:36:17 +0200 Subject: [PATCH] =?UTF-8?q?Godt=20p=C3=A5=20vej?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- udpak_semistruktur/config.py | 471 ++++++++++++++++++++++++ udpak_semistruktur/db.py | 62 ++++ udpak_semistruktur/extract/reader.py | 129 +++++++ udpak_semistruktur/extract/traversal.py | 217 +++++++++++ udpak_semistruktur/load/db_writer.py | 99 +++++ udpak_semistruktur/load/file_writer.py | 59 +++ udpak_semistruktur/transform/clean.py | 145 ++++++++ udpak_semistruktur/transform/convert.py | 195 ++++++++++ udpak_semistruktur/transform/reshape.py | 190 ++++++++++ 9 files changed, 1567 insertions(+) diff --git a/udpak_semistruktur/config.py b/udpak_semistruktur/config.py index e69de29..de7298b 100644 --- a/udpak_semistruktur/config.py +++ b/udpak_semistruktur/config.py @@ -0,0 +1,471 @@ +import os +import re +import yaml +import hashlib + +from typing import Any +from datetime import datetime +from copy import deepcopy + +from udpak_semistruktur.logger import hent_logger + +logger = hent_logger(__name__) + +# ========================= +# Hjælpefunktioner +# ========================= + +def _resolve_include_path(base_file: str, include_path: str) -> str: + """Gør relative include-stier relative til YAML-filens mappe.""" + if os.path.isabs(include_path): + return include_path + base_dir = os.path.dirname(os.path.abspath(base_file)) + return os.path.abspath(os.path.join(base_dir, include_path)) + + +def _deep_merge_dicts(a: dict, b: dict) -> dict: + """Dyb-merge a <- b. Lister overskrives (undtagen håndteres særskilt i deep_merge_lists).""" + a = a or {} + b = b or {} + out = dict(a) + for k, v in b.items(): + if isinstance(v, dict) and isinstance(out.get(k), dict): + out[k] = _deep_merge_dicts(out[k], v) + else: + out[k] = deepcopy(v) + return out + + +def deep_merge_lists(base_cfg: dict, incoming_cfg: dict) -> dict: + """ + Robust top-level merge a <- b: + - Dicts merges rekursivt. + - For nøglen 'output_filer' APPEND'er vi (liste + liste). + - Øvrige lister overskrives. + Håndterer at en af parterne kan være None. + """ + base_cfg = base_cfg or {} + incoming_cfg = incoming_cfg or {} + + if not isinstance(base_cfg, dict) or not isinstance(incoming_cfg, dict): + return deepcopy(incoming_cfg) + + out = dict(base_cfg) + for key, value in incoming_cfg.items(): + if key == "output_filer": + base_list = out.get("output_filer") + if not isinstance(base_list, list): + base_list = [] if base_list is None else [base_list] + if isinstance(value, list): + out["output_filer"] = base_list + deepcopy(value) + elif value is None: + out["output_filer"] = base_list + else: + out["output_filer"] = base_list + [deepcopy(value)] + else: + existing = out.get(key) + if isinstance(existing, dict) and isinstance(value, dict): + out[key] = deep_merge_lists(existing, value) + else: + out[key] = deepcopy(value) + return out + + +def _expand_output_groups_new_only(cfg: dict) -> dict: + """ + Kræv den nye grupperede form for 'output_filer': + - hvert element: { navn?, rod, kolonner(list), outputs: [ {type:'fil'|'tabel', ...}, ... ] } + - 'type' kan udelades hvis fil/tabel kan udledes (fil_navn/tabel_navn). + - 'overskrifter' på gruppeniveau bruges som default til fil-børn, hvis de ikke selv sætter det. + Returnerer cfg med flad 'output_filer'. + """ + if "output_filer" not in cfg or not isinstance(cfg["output_filer"], list): + raise ValueError("'output_filer'-sektionen skal være en liste af grupper.") + + flattened = [] + for idx, group in enumerate(cfg["output_filer"]): + if not isinstance(group, dict): + raise ValueError(f"'output_filer' element #{idx+1} skal være et objekt.") + if "outputs" not in group or not isinstance(group["outputs"], list) or not group["outputs"]: + raise ValueError(f"Gruppe #{idx+1} mangler en ikke-tom 'outputs'-liste.") + + # Fælles felter – KRÆVES + if "rod" not in group: + raise ValueError(f"Gruppe #{idx+1} mangler 'rod'.") + if "kolonner" not in group or not isinstance(group["kolonner"], list): + raise ValueError(f"Gruppe #{idx+1} mangler 'kolonner' (liste).") + + common = { + "rod": group["rod"], + "kolonner": deepcopy(group["kolonner"]), + } + if "hvis_findes" in group: + common["hvis_findes"] = deepcopy(group["hvis_findes"]) + + group_default_headers = group.get("overskrifter", None) # optional default for filer + + for cidx, child in enumerate(group["outputs"]): + if not isinstance(child, dict): + raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} skal være et objekt.") + + # Udled type, hvis mulig + ctype = child.get("type") + if ctype is None: + if "fil_navn" in child: + ctype = "fil" + elif "tabel_navn" in child: + ctype = "tabel" + else: + raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} mangler 'type' og kan ikke udledes.") + + if ctype not in ("fil", "tabel"): + raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} har ukendt type '{ctype}'.") + + merged = _deep_merge_dicts(common, child) + + # Sikr præcis én destination + has_file = "fil_navn" in merged + has_table = "tabel_navn" in merged + if has_file and has_table: + raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} må ikke have både 'fil_navn' og 'tabel_navn'.") + if not has_file and not has_table: + raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} skal have enten 'fil_navn' eller 'tabel_navn'.") + + # 'overskrifter' gælder kun for filer; brug gruppens default hvis ikke sat + if ctype == "fil": + if "overskrifter" not in merged and group_default_headers is not None: + merged["overskrifter"] = group_default_headers + else: + merged.pop("overskrifter", None) + + # Lidt housekeeping + merged["type"] = ctype + if "navn" in group: + merged["_gruppe_navn"] = group["navn"] + + flattened.append(merged) + + out = dict(cfg) + out["output_filer"] = flattened + return out + + +# ========================= +# Hovedfunktion +# ========================= + +def valider_yaml(yaml_file_path: str) -> dict: + """ + Validerer YAML-konfigurationen og udvider env-variabler. + Understøtter KUN den nye grupperede model for 'output_filer'. + """ + + # 1) Eksistens + if not os.path.exists(yaml_file_path): + raise FileNotFoundError(f"Konfigurationsfilen '{yaml_file_path}' findes ikke.") + + # 2) Indlæs base + with open(yaml_file_path, "r", encoding="utf-8") as f: + config = yaml.safe_load(f) or {} + + # 3) Env-vars før include + config = erstat_env_vars(config) + + # 4) Include + combined = {k: v for k, v in config.items() if k != "include"} + included_files = config.get("include", []) + if isinstance(included_files, str): + included_files = [included_files] + + for inc in included_files: + resolved = _resolve_include_path(yaml_file_path, inc) + if not os.path.exists(resolved): + raise FileNotFoundError(f"Inkluderet YAML-fil '{inc}' findes ikke!") + with open(resolved, "r", encoding="utf-8") as f: + inc_cfg = yaml.safe_load(f) or {} + inc_cfg = erstat_env_vars(inc_cfg) + combined = deep_merge_lists(combined, inc_cfg) + + config = combined + config.pop("include", None) + + # 5) Kræv 'config' + if "config" not in config: + raise ValueError("YAML-filen skal indeholde en 'config'-sektion.") + + # 6) Env-vars igen (efter merge) + config = erstat_env_vars(config) + global_config = config["config"] + + # 7) Rensedata + rens_intervaller = [] + rens_intervaller_erstats = [] + rens_chars = {} + rens_cfg = config.get("rens", {}) or {} + + for group_name, group_items in rens_cfg.items(): + if group_name == "regex": + continue + if not isinstance(group_items, list): + continue + for item in group_items: + if isinstance(item, dict): + tegn = item.get("tegn") + erstat = item.get("erstat", "") + else: + tegn = item + erstat = "" + if isinstance(tegn, str) and "-" in tegn and len(tegn) >= 3: + rens_intervaller.append(tegn) + rens_intervaller_erstats.append(erstat) + else: + rens_chars[tegn] = erstat + + if rens_intervaller: + parts = [f"(?P[{interval}])" for i, interval in enumerate(rens_intervaller)] + rens_regex = re.compile("|".join(parts), flags=re.UNICODE) + else: + rens_regex = None + + rens_regex_rules = [] + for rx in rens_cfg.get("regex", []) or []: + pattern = rx.get("mønster") or rx.get("monster") or rx.get("pattern") + repl = rx.get("erstat", "") + if pattern: + rens_regex_rules.append((re.compile(pattern, flags=re.UNICODE), repl)) + + global_config["rens_intervaller_regex"] = rens_regex + global_config["rens_intervaller_erstats"] = rens_intervaller_erstats + global_config["rens_all_chars"] = rens_chars + global_config["rens_regex_rules"] = rens_regex_rules + + # 8) Output-sti + if "output_path" in global_config and global_config["output_path"] is not None: + sti = str(global_config["output_path"]) + if not sti.endswith(os.sep): + sti += os.sep + global_config["output_path"] = sti + if not os.path.exists(global_config["output_path"]): + raise FileNotFoundError(f"Output-stien '{global_config['output_path']}' findes ikke.") + if not os.access(global_config["output_path"], os.W_OK): + raise PermissionError(f"Output-stien '{global_config['output_path']}' er ikke skrivbar.") + + # 9) Dato + if "dato" in global_config and global_config["dato"] is not None: + if "dato_format" not in global_config: + raise ValueError("Hvis 'dato' er angivet i YAML, skal 'dato_format' også være til stede.") + try: + global_config["dato"] = datetime.strptime(str(global_config["dato"]), str(global_config["dato_format"])) + except ValueError: + raise ValueError( + f"Forkert datoformat i config: '{global_config['dato']}'. " + f"Forventet format: '{global_config['dato_format']}'." + ) + else: + global_config["dato"] = datetime.now() + + # 10) Output-filer (NY model -> flad) + config = _expand_output_groups_new_only(config) + + + + # 11) Basisvalideringer pr. output + if "output_filer" not in config or not isinstance(config["output_filer"], list) or not config["output_filer"]: + raise ValueError("'output_filer' skal være en ikke-tom liste (efter grupper er udvidet).") + + for file_cfg in config["output_filer"]: + if not isinstance(file_cfg, dict): + raise ValueError("Hvert output-element skal være et objekt.") + if ("fil_navn" in file_cfg) == ("tabel_navn" in file_cfg): + raise ValueError("Hvert output skal have præcis én af 'fil_navn' eller 'tabel_navn'.") + if "rod" not in file_cfg: + raise ValueError("Hvert output skal have en 'rod'.") + if "kolonner" not in file_cfg or not isinstance(file_cfg["kolonner"], list): + raise ValueError("Hvert output skal have 'kolonner' som en liste.") + if "tabel_navn" in file_cfg: + file_cfg.pop("overskrifter", None) # kun relevant for filer + + if "hvis_findes" in file_cfg: + hf = file_cfg["hvis_findes"] + + if isinstance(hf, str): + file_cfg["hvis_findes"] = [hf] + hf = file_cfg["hvis_findes"] + + if not isinstance(hf, list): + raise ValueError("'hvis_findes' skal være en streng eller en liste af strenge.") + + for i, sti in enumerate(hf): + if not isinstance(sti, str) or not sti.strip(): + raise ValueError(f"'hvis_findes' element #{i+1} skal være en ikke-tom streng.") + + # Hash-validering + hash_fields = [c for c in file_cfg["kolonner"] if isinstance(c, dict) and c.get("type") == "hash"] + if len(hash_fields) > 1: + raise ValueError("Der må maksimalt være én 'hash'-kolonne pr. output.") + if hash_fields: + algo = hash_fields[0].get("hash_algorithm", "sha256") + if algo not in hashlib.algorithms_guaranteed: + raise ValueError(f"Forkert HASH-algoritme: '{algo}'. Gyldige værdier: {', '.join(sorted(hashlib.algorithms_guaranteed))}") + + # Kolonne-regler + for col in file_cfg["kolonner"]: + if not isinstance(col, dict): + continue # skabelon-kortform er OK + if col.get("type") == "hash" and "felt" in col: + raise ValueError("En 'hash'-kolonne må ikke have et 'felt'.") + if col.get("type") == "id" and "felt" in col: + raise ValueError("En 'id'-kolonne må ikke have et 'felt'.") + + + # Udfyld manglende kolonnenavne (ikke for skabelon/hash/id) + for i, col in enumerate(file_cfg["kolonner"]): + if isinstance(col, dict): + if not col.get("navn") and col.get("type") not in ("hash", "id") and "skabelon" not in col: + col["navn"] = f"kolonne_{i+1}" + + # Dubletnavne + navne = [c.get("navn") for c in file_cfg["kolonner"] if isinstance(c, dict) and c.get("navn")] + dups = sorted({n for n in navne if navne.count(n) > 1}) + if dups: + target = file_cfg.get("fil_navn", file_cfg.get("tabel_navn", "ukendt")) + raise ValueError(f"Kolonnenavne i output '{target}' indeholder dubletter: {', '.join(dups)}") + + # 12) Input-krav + if not ("input_fil" in global_config or "input_fil_liste" in global_config): + raise ValueError("Der skal være enten 'input_fil' eller 'input_fil_liste' i 'config'-sektionen.") + if "input_fil" in global_config and "input_fil_liste" in global_config: + raise ValueError("Kun én af 'input_fil' eller 'input_fil_liste' må angives i 'config'.") + + + # 13) Defaults + global_config.setdefault("miljø", "udv") + global_config.setdefault("skrivetilstand", "w") + global_config.setdefault("encoding", "utf-8") + global_config.setdefault("separator", "\t") + global_config.setdefault("logfil", "log_{yyyy}{mm}{dd}.txt") + global_config.setdefault("debug", False) + global_config.setdefault("dan_ok", True) + global_config.setdefault("global_rens", False) + global_config.setdefault("global_fjern_linjeskift", False) + global_config.setdefault("global_dato_ind", "%Y-%m-%d") + global_config.setdefault("global_dato_ud", "%Y-%m-%d") + global_config.setdefault("fejl_fil_ext", None) + global_config.setdefault("stop_ved_0_output", False) + global_config.setdefault("db_char_set", "latin-1") + + # 14) Logfilnavn formateres med dato + global_config["logfil"] = str(global_config["logfil"]).format( + yy=global_config["dato"].strftime("%y"), + yyyy=global_config["dato"].strftime("%Y"), + mm=global_config["dato"].strftime("%m"), + dd=global_config["dato"].strftime("%d"), + ) + + # 15) Timestamp + global_config["var_timestamp"] = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f") + + # 16) Debug + if global_config.get("debug") is True: + logger.debug("YAML-konfigurationen er gyldig.") + + # 17) Udvid kolonne-skabeloner (din eksisterende funktion) + config = udvid_kolonne_skabeloner(config) + + return config + + +def erstat_env_vars(config: Any) -> Any: + """Erstatter miljøvariabler på formen ${VAR} rekursivt i config-strukturen.""" + + if isinstance(config, dict): + return {key: erstat_env_vars(value) for key, value in config.items()} + elif isinstance(config, list): + return [erstat_env_vars(item) for item in config] + elif isinstance(config, str): + # Find og erstat alle miljøvariabler + start_index = config.find("${") + while start_index != -1: + end_index = config.find("}", start_index) + if end_index == -1: + break # Ugyldigt format, stop + + env_var = config[start_index + 2:end_index] # Udhent miljøvariabelnavn + env_value = os.getenv(env_var, None) # Hent værdi fra miljøet + + if env_value is None: + raise ValueError(f"Miljøvariabel '{env_var}' er ikke sat, men bruges i config-filen!") + + config = config.replace(f"${{{env_var}}}", env_value) # Erstat variablen + + start_index = config.find("${") # Søg efter næste variabel + + return config + else: + return config # Returnér uændret, hvis det ikke er en streng + + +def udvid_kolonne_skabeloner(config: dict) -> dict: + """Ekspanderer kolonne-skabelon-referencer i output_filer til flade kolonner.""" + + skabeloner = config.get("kolonne_skabeloner", {}) + + def expand_template(skabelonnavn, prefix="", prefix_felt="", skabelon_rod=""): + if skabelonnavn not in skabeloner: + raise ValueError(f"Skabelon '{skabelonnavn}' findes ikke.") + + udvidede = [] + for item in skabeloner[skabelonnavn]: + # Tjek om dette element definerer en ny rod + aktuel_rod = item.get("rod", "") + ny_rod = skabelon_rod + + if aktuel_rod: + if skabelon_rod and skabelon_rod != aktuel_rod: + raise ValueError(f"Konflikt: To rødder fundet ('{skabelon_rod}' og '{aktuel_rod}') i skabelon '{skabelonnavn}'.") + ny_rod = aktuel_rod + + if "skabelon" in item: + under_skabelon = item["skabelon"] + under_prefix = prefix + item.get("prefix", item.get("prefix_navn", "")) + under_prefix_felt = prefix_felt + item.get("prefix_felt", "") + if under_prefix_felt and not under_prefix_felt.endswith("."): + under_prefix_felt += "." + + # Giv den fundne rod videre til næste niveau + udvidede.extend(expand_template(under_skabelon, under_prefix, under_prefix_felt, ny_rod)) + else: + nyt_item = item.copy() + nyt_item["navn"] = prefix + item["navn"] + + if "felt" in nyt_item and nyt_item["felt"]: + nyt_item["felt"] = prefix_felt + item["felt"] + + # Sæt roden hvis den findes, ellers fejler vi ikke her (da den kan mangle helt) + if ny_rod: + nyt_item["rod"] = ny_rod + + udvidede.append(nyt_item) + return udvidede + + for output_fil in config.get("output_filer", []): + nye_kolonner = [] + for kol in output_fil.get("kolonner", []): + if "skabelon" in kol: + skabelon_rod = kol.get("rod", "") + prefix = kol.get("prefix", kol.get("prefix_navn", "")) + prefix_felt = kol.get("prefix_felt", "") + if prefix_felt: + if not prefix_felt.endswith("."): + prefix_felt += "." + nye_kolonner.extend(expand_template( + kol["skabelon"], + prefix, + prefix_felt, + skabelon_rod + )) + else: + nye_kolonner.append(kol) + output_fil["kolonner"] = nye_kolonner + + return config diff --git a/udpak_semistruktur/db.py b/udpak_semistruktur/db.py index e69de29..d6ff16c 100644 --- a/udpak_semistruktur/db.py +++ b/udpak_semistruktur/db.py @@ -0,0 +1,62 @@ +import os +import json +import base64 + +from udpak_semistruktur.logger import hent_logger + +logger = hent_logger(__name__) + +def læs_json_fil(global_config: dict) -> tuple[str, str, str, str, str]: + """Henter database-credentials fra pwd.json baseret på miljø i global_config.""" + + base_path = os.environ.get("PMROOTDIR") + if not base_path: + logger.error("Miljøvariablen 'PMROOTDIR' er ikke sat.") + raise EnvironmentError("Miljøvariablen 'PMROOTDIR' er ikke sat.") + + if global_config["miljø"].lower() == "prd": + env = "BASE_PROD" + host = "sdpaseprdbase.ccta.dk" + port = "7001" + elif global_config["miljø"].lower() == "pre": + env = "BASE_PRE" + host = "sdpasepredb01.ccta.dk" + port = "7101" + elif global_config["miljø"].lower() == "udv": + env = "BASE_UDV" + host = "sdpaseudvdb01.ccta.dk" + port = "7301" + else: + logger.error(f"Ukendt database-miljø: '{global_config['miljø']}'.") + raise ValueError(f"Ukendt database-miljø: '{global_config['miljø']}'.") + + filnavn = os.path.join(base_path, "tools", "nogler", "pwd.json") + + brugernavn = "" + password = "" + + try: + with open(filnavn, 'r', encoding='utf-8') as fil: + data = json.load(fil) + + for nøgle, værdier in data.items(): + if nøgle == env: + if len(værdier) >= 2: + brugernavn = værdier[0] + password = base64.b64decode(værdier[1]).decode("utf-8") + else: + logger.error("Fejl i data, ikke nok værdier!") + raise ValueError(f"Fejl i credentials-fil: ikke nok værdier for '{env}'.") + + except FileNotFoundError: + logger.error(f"Credentials-filen '{filnavn}' blev ikke fundet.") + raise + + except json.JSONDecodeError as e: + logger.error(f"Fejl ved indlæsning af JSON fra '{filnavn}': {e}") + raise + + if brugernavn == "": + raise ValueError("Kunne ikke finde brugernavn og adgangskode i credentials-filen.") + + return brugernavn, password, env, host, port diff --git a/udpak_semistruktur/extract/reader.py b/udpak_semistruktur/extract/reader.py index e69de29..de400ea 100644 --- a/udpak_semistruktur/extract/reader.py +++ b/udpak_semistruktur/extract/reader.py @@ -0,0 +1,129 @@ +import os +import json +import codecs +import re +import xmltodict + +from pathlib import Path +from typing import Any, Generator, Optional +from charset_normalizer import from_path +from udpak_semistruktur.logger import hent_logger + +logger = hent_logger(__name__) + +def detect_file_info(file_path: str) -> tuple[str, str]: + """ + Returnerer (file_type, encoding). + file_type ∈ {'xml','json','ukendt'}. + Robust mod BOM og UTF-16/32. + """ + # --- filendelse som hint --- + ext = os.path.splitext(file_path)[1].lower() + ext_hint = "xml" if ext == ".xml" else "json" if ext == ".json" else None + + # --- sniff bytes --- + with open(file_path, "rb") as f: + data = f.read(4096) + + if not data: + # tom fil → ukendt, men behold evt. hint + enc_guess = "utf-8" + return (ext_hint or "ukendt", enc_guess) + + # strip BOMs (for at gøre type-sniff lettere) + for bom in (codecs.BOM_UTF8, codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE, + codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE): + if data.startswith(bom): + data = data[len(bom):] + break + + # tillad nulbytes imellem tegn (UTF-16/32) + def spaced_bytes(s: bytes) -> bytes: + return b"".join(re.escape(bytes([b])) + b"\x00*" for b in s) + + xml_patterns = [ + re.compile(spaced_bytes(b" Generator[Any, None, None]: + """ + Generator der læser én eller flere filer: + - Sniffer type+encoding (detect_file_info) + - Åbner med korrekt encoding (fallback ved decode-fejl) + - Parser JSON/XML (xmltodict) + - keep_prefixes=True: bevar 'cm:Tag' (anbefalet til din præfiks-fallback) + keep_prefixes=False: brug process_namespaces=True → '{uri}LocalName' + """ + stop_file_path = os.path.join(config["output_path"], "stop.txt") + + def load_file(file_path): + file_type, encoding = detect_file_info(file_path) + logger.info(f"Læser filen: {file_path} (type={file_type}, encoding={encoding})") + config["current_file"] = os.path.basename(file_path) + + # Læs med detekteret encoding; fallback hvis nødvendigt + try: + with open(file_path, 'r', encoding=encoding) as f: + content = f.read() + except UnicodeDecodeError: + with open(file_path, 'r', encoding='utf-8', errors='replace') as f: + content = f.read() + + if file_type == "json": + return json.loads(content) + elif file_type == "xml": + if keep_prefixes: + # bevar 'cm:Tag' nøgler → passer til din eksisterende prefix-fallback + return xmltodict.parse(content) + else: + # brug '{uri}LocalName' nøgler (kræver din URI-matchlogik) + return xmltodict.parse(content, process_namespaces=True) + else: + raise ValueError(f"Ukendt eller ikke-understøttet filtype: {file_path}") + + if input_fil: + yield load_file(input_fil) + + elif input_fil_liste: + logger.info("Læser fillisten: " + input_fil_liste) + with open(input_fil_liste, 'r', encoding="utf-8") as f: + for file_name in f.read().splitlines(): + fil = Path(stop_file_path) + if fil.is_file(): + logger.info("Stop-fil fundet – afbryder fillæsning.") + return + if not file_name.strip(): + continue + yield load_file(file_name) diff --git a/udpak_semistruktur/extract/traversal.py b/udpak_semistruktur/extract/traversal.py index e69de29..38a006f 100644 --- a/udpak_semistruktur/extract/traversal.py +++ b/udpak_semistruktur/extract/traversal.py @@ -0,0 +1,217 @@ +from typing import Any, Generator, Union, List +from udpak_semistruktur.logger import hent_logger + +logger = hent_logger(__name__) + +def _extract_text_node(v: Any) -> Any: + """ + Udtrækker ren tekstværdi fra et xmltodict-node hvis muligt. + Har noden '#text' og ellers kun '@...'-nøgler, returneres v['#text']. + Ellers returneres v uændret. + """ + if isinstance(v, dict) and "#text" in v: + other_keys = [k for k in v.keys() if k != "#text"] + if all(k.startswith("@") for k in other_keys): + return v["#text"] + return v + +def _resolve_with_indices(obj: Any, path: str, sti_index: dict, base_path: str = "") -> Any: + """ + Følger en dot-sti og bruger sti_index til at vælge elementer + hver gang vi rammer en liste. base_path er den allerede-resolverede + sti (brug den når du starter nede i en 'rod'). + Eksempler på sti_index: + {"orders": 1, "orders.items": 0} + """ + if path in (None, "", "."): + return obj + + parts = [p for p in path.split(".") if p != ""] + cur = obj + acc = base_path.strip(".") + + for key in parts: + # Hvis vi står på en liste, vælg det aktuelle index for den akkumulerede sti + if isinstance(cur, list): + idx_key = acc + idx = sti_index.get(idx_key, 0) + try: + cur = cur[idx] + except (TypeError, IndexError): + return None + + if key == "#text": + if isinstance(cur, dict): + cur = cur.get("#text") + acc = f"{acc}.#text" if acc else "#text" + continue + else: + return cur + + + # Slå næste nøgle op i dict med prefix-fallback + if isinstance(cur, dict): + match = find_nøgle_med_prefix_fallback(cur, key) + if match is None: + return None + cur = cur.get(match) + acc = match if not acc else f"{acc}.{match}" + else: + return None + + # Hvis vi ender på en liste, vælg index én sidste gang + if isinstance(cur, list): + idx = sti_index.get(acc, 0) + try: + cur = cur[idx] + except (TypeError, IndexError): + return None + + return _extract_text_node(cur) + +def sti_findes(obj: Any, sti: str, sti_index: dict) -> bool: + """ + Returnerer True hvis stien kan opløses relativt til obj. + Bruges af 'hvis_findes'. + """ + if not sti: + return False + + kandidat = _resolve_with_indices(obj, sti, sti_index, base_path="") + return kandidat is not None + +def matcher_hvis_findes(obj: Any, hvis_findes: Any, sti_index: dict) -> bool: + """ + OR-logik: + - hvis 'hvis_findes' ikke er angivet -> True + - hvis listen er tom -> True + - ellers True hvis mindst én sti findes + """ + if not hvis_findes: + return True + + if isinstance(hvis_findes, str): + hvis_findes = [hvis_findes] + + return any(sti_findes(obj, sti, sti_index) for sti in hvis_findes) + +def find_nøgle_med_prefix_fallback(obj: dict, nøgle: str) -> str | None: + """Slår en nøgle op i et dict med fallback til namespace-præfiks (fx 'cm:Navn').""" + if nøgle in obj: + return nøgle + for key in obj: + if ":" in key and key.split(":")[-1] == nøgle: + return key + return None + +def rekursiv_udpakning(obj, sti: Union[str, List[str]], base_path="", path_index=None): + if sti in [None, "", [], "."]: + yield obj, path_index or {} + return + + # Ny: rod="*" → iterér værdier i dict (og bevar nøglen i sti_index["__key"]) + if sti in ("*", ".*"): + if isinstance(obj, dict): + for k, v in obj.items(): + ny_index = (path_index or {}).copy() + ny_index["__key"] = k # så @key virker + yield v, ny_index + elif isinstance(obj, list): + for i, v in enumerate(obj): + ny_index = (path_index or {}).copy() + ny_index[(base_path or "").rstrip(".")] = i + yield v, ny_index + else: + # ikke-liste/dict → bare returnér objektet som er + yield obj, path_index or {} + return + + if isinstance(sti, str): + sti_dele = sti.split(".") + else: + sti_dele = sti + + if not sti_dele: + yield obj, path_index or {} + return + + nøgle = sti_dele[0] + resten = sti_dele[1:] + + # NYT: wildcard-sti-segment + if nøgle == "*": + if isinstance(obj, dict): + for k, v in obj.items(): + ny_index = (path_index or {}).copy() + ny_index["__key"] = k # så @key virker + # base_path + k + "." afspejler at vi "går ned" i dict'en + yield from rekursiv_udpakning(v, resten, base_path + k + ".", ny_index) + elif isinstance(obj, list): + for idx, v in enumerate(obj): + ny_index = (path_index or {}).copy() + ny_index[base_path.rstrip(".")] = idx + yield from rekursiv_udpakning(v, resten, base_path, ny_index) + return + + if isinstance(obj, dict): + matchende_nøgle = find_nøgle_med_prefix_fallback(obj, nøgle) + if matchende_nøgle is None: + return + + næste = obj.get(matchende_nøgle) + + ny_path_index = (path_index or {}).copy() + ny_path_index[(base_path + matchende_nøgle).strip(".")] = 0 + + if isinstance(næste, list): + for idx, element in enumerate(næste): + ny_path_index = (path_index or {}).copy() + ny_path_index[base_path + matchende_nøgle] = idx + yield from rekursiv_udpakning( + element, resten, base_path + matchende_nøgle + ".", ny_path_index + ) + else: + yield from rekursiv_udpakning( + næste, resten, base_path + matchende_nøgle + ".", path_index + ) + + elif isinstance(obj, list): + for idx, item in enumerate(obj): + ny_path_index = (path_index or {}).copy() + ny_path_index[base_path.rstrip(".")] = idx + yield from rekursiv_udpakning( + item, sti_dele, base_path, ny_path_index + ) + +def hent_objekt_fra_sti(root_obj: Any, sti: str, path_index: dict) -> Any: + """Returnerer objektet ved den angivne dot-sti relativt til root_obj.""" + return _resolve_with_indices(root_obj, sti, path_index, base_path="") + +def hent_fra_objekt_med_prefix_fallback(obj: Any, nøgle: str) -> Any: + """Slår nøgle op i obj med namespace-præfiks fallback og returnerer tekstnoden.""" + if not isinstance(obj, dict): + return None + + # 1. Direkte opslag + if nøgle in obj: + return _extract_text_node(obj[nøgle]) + + # 2. Hvis vi har 'CountryCode', så prøv fx 'c:CountryCode', 'cm:CountryCode' + for key in obj.keys(): + if ":" in key and key.split(":")[-1] == nøgle: + return _extract_text_node(obj[key]) + + return None + +def hent_felt(obj: Any, sti: str) -> Any: + """Følger en dot-sti i obj uden sti_index – bruges til simple opslag.""" + dele = sti.split('.') + for del_navn in dele: + if isinstance(obj, list): + return [hent_felt(item, '.'.join(dele[dele.index(del_navn):])) for item in obj] + elif isinstance(obj, dict): + obj = hent_fra_objekt_med_prefix_fallback(obj, del_navn) + else: + return None + return obj + diff --git a/udpak_semistruktur/load/db_writer.py b/udpak_semistruktur/load/db_writer.py index e69de29..a893a35 100644 --- a/udpak_semistruktur/load/db_writer.py +++ b/udpak_semistruktur/load/db_writer.py @@ -0,0 +1,99 @@ +import pyodbc +from typing import Any + +from udpak_semistruktur.logger import hent_logger + +logger = hent_logger(__name__) + +def insert_rows_ase( + conn: pyodbc.Connection, + table_name: str, + columns: list[str], + rows: list[dict], + batch_size: int = 1000, + use_fast_executemany: bool = True, +) -> tuple[int, list[dict]]: + """ + Indsætter rows i ASE-tabellen i batches. + Returnerer (indsatte_antal, db_fejl_rows) hvor db_fejl_rows er liste af dicts + '_db_error'. + - Bruger fast_executemany hvis ønsket. + - Ved batch-fejl ruller den tilbage og prøver enkeltvis for at isolere fejlrækker. + """ + + if not rows: + return 0, [] + + # ASE tåler som regel ukvoterede kolonnenavne, men har I specialtegn/uppercases: brug "Col" + col_list = ", ".join(columns) + placeholders = ", ".join(["?"] * len(columns)) + sql = f"INSERT INTO {table_name} ({col_list}) VALUES ({placeholders})" + + logger.debug(f"INSERT til {table_name}: {len(rows)} rækker, batch_size={batch_size}") + + cur = conn.cursor() + if use_fast_executemany: + # I jeres tests gav det værdi—så sætter vi det + cur.fast_executemany = True + + params = [[r.get(c) for c in columns] for r in rows] + inserted = 0 + failed = [] + + try: + for i in range(0, len(params), batch_size): + chunk = params[i:i+batch_size] + try: + cur.executemany(sql, chunk) + inserted += len(chunk) + conn.commit() + except Exception: + logger.warning(f"Batch fejlede ved offset {i} – prøver enkeltvis") + # Batch fejlede – find de enkelte fejlrækker + conn.rollback() + for off, one in enumerate(chunk): + try: + cur.execute(sql, one) + inserted += 1 + except Exception as e_row: + # Bevar original data og tilføj fejl + bad = dict(rows[i + off]) + bad["_db_error"] = str(e_row) + failed.append(bad) + conn.commit() + return inserted, failed + except Exception as e: + logger.error(f"Kritisk fejl ved INSERT til {table_name}: {e}") + # Noget stort gik galt – markér alle som fejlet + conn.rollback() + for r in rows: + bad = dict(r) + bad["_db_error"] = str(e) + failed.append(bad) + return inserted, failed + +def get_ase_connection_windows( + user: str, + password: str, + host: str, + port: str, + database: str, + autocommit: bool = False, +) -> pyodbc.Connection: + """Opretter og returnerer en pyodbc-forbindelse til Sybase ASE på Windows.""" + + conn_str = ( + f"DRIVER={{Adaptive Server Enterprise}};" + f"SERVER={host};" + f"PORT={port};" + f"DATABASE={database};" + f"UID={user};" + f"PWD={password};" + ) + conn = pyodbc.connect(conn_str, autocommit=autocommit) + + cur = conn.cursor() + cur.execute("SET QUOTED_IDENTIFIER ON") + cur.close() + + return conn + diff --git a/udpak_semistruktur/load/file_writer.py b/udpak_semistruktur/load/file_writer.py index e69de29..0444fde 100644 --- a/udpak_semistruktur/load/file_writer.py +++ b/udpak_semistruktur/load/file_writer.py @@ -0,0 +1,59 @@ +import os +import csv +import time + +from typing import Any, Callable + +from udpak_semistruktur.logger import hent_logger + +logger = hent_logger(__name__) + +def skriv_fil_med_retry( + skrivefunktion: Callable, + filsti: str, + max_forsøg: int = 10, + ventetider: list[int] = None, +) -> Any: + """Forsøger at kalde skrivefunktion op til max_forsøg gange med stigende ventetid ved PermissionError.""" + + sidste_exception = None + + if ventetider is None: + ventetider = [2, 5, 10, 15, 20, 30, 40, 50, 60, 100] + + for i in range(max_forsøg): + try: + return skrivefunktion() + except PermissionError as e: + delay = ventetider[min(i, len(ventetider)-1)] + logger.warning(f"Fejl ved skrivning til fil '{filsti}' (forsøg {i+1}/{max_forsøg}) (venter: {delay} sekunder): {e}") + sidste_exception = e + time.sleep(delay) + raise sidste_exception + +def generer_filer_med_overskrifter( + overskrifter: bool, + output_file: str, + columns: list, + global_config: dict, +) -> None: + """Opretter output-fil og skriver kolonneoverskrifter hvis konfigureret.""" + + tilstand = global_config["skrivetilstand"] + enc = global_config["encoding"] + separator = global_config['separator'] + + logger.debug(f"generer_filer_med_overskrifter kaldt: fil={output_file}, tilstand={tilstand}, enc={enc}") + + if tilstand.upper() == 'W': + with open(output_file, 'w', encoding=enc) as out_fil: + if overskrifter: + out_fil.write(separator.join(col["navn"] if "navn" in col else f"Kolonne_{idx+1}" for idx, col in enumerate(columns)) + "\n") + + if tilstand.upper() == 'A': + # TODO: out_fil er ikke åbnet i 'A'-tilstanden – denne gren virker ikke korrekt + if os.path.exists(output_file): + if os.path.getsize(output_file) == 0: + if overskrifter: + out_fil.write(separator.join(col["navn"] if "navn" in col else f"Kolonne_{idx+1}" for idx, col in enumerate(columns)) + "\n") + diff --git a/udpak_semistruktur/transform/clean.py b/udpak_semistruktur/transform/clean.py index e69de29..3636684 100644 --- a/udpak_semistruktur/transform/clean.py +++ b/udpak_semistruktur/transform/clean.py @@ -0,0 +1,145 @@ +from typing import Any +from bs4 import BeautifulSoup +from udpak_semistruktur.logger import hent_logger + +logger = hent_logger(__name__) + +def fjern_linjeskift(data: dict, file_config: dict, global_config: dict) -> dict: + """Fjerner linjeskift fra kolonner markeret med fjern_linjeskift: true i YAML.""" + if not isinstance(data, dict) or "rækker" not in data: + return data + + kolonner = file_config.get("kolonner", []) + + for kolonne in kolonner: + if not kolonne.get("fjern_linjeskift", False): + continue + kolonnenavn = kolonne.get("navn") + for række in data["rækker"]: + værdi = række.get(kolonnenavn) + if isinstance(værdi, str): + værdi_ny = værdi.replace("\r\n", "").replace("\n", "").replace("\r", "") + else: + værdi_ny = værdi + + række[kolonnenavn] = værdi_ny + return data + +def perform_strip(text: Any) -> Any: + """Fjerner HTML/XML-tags fra en tekststreng via BeautifulSoup.""" + if not text or "<" not in text: # Hurtigt tjek om der overhovedet er tags + return text + + # Detekter om det er XML/XHTML eller almindelig HTML + parser = "xml" if " og blok-tags + # (h1, p) ikke mases sammen. strip=True fjerner whitespace i start/slut. + return soup.get_text(separator="\n", strip=True) + except Exception: + # Hvis parseren fejler, returneres den rå tekst (sikkerhedsnet) + return text + +def tag_strip(data: dict, file_config: dict, global_config: dict) -> dict: + """Stripper HTML/XML-tags fra kolonner markeret med strip_tags: true i YAML.""" + if not isinstance(data, dict) or "rækker" not in data: + return data + for kolonne in file_config.get("kolonner", []): + navn = kolonne.get("navn") + if not navn: + continue + + strip_tags = kolonne.get("strip_tags", False) + + for række in data["rækker"]: + v = række.get(navn) + if v is None: + continue + if strip_tags: + # Vi konverterer til string og stripper tags i ét hug + række[navn] = perform_strip(str(v)) + else: + # Hvis der ikke skal strippes tags, bevares værdien (som string) + række[navn] = str(v) + + return data + +def upper_lower(data: dict, file_config: dict, global_config: dict) -> dict: + """Konverterer kolonneværdier til upper- eller lowercase efter YAML-konfiguration.""" + if not isinstance(data, dict) or "rækker" not in data: + return data + + for kolonne in file_config.get("kolonner", []): + navn = kolonne.get("navn") + if not navn: + continue + + ucase = kolonne.get("ucase", False) + lcase = kolonne.get("lcase", False) + if ucase and lcase: + continue # undgå konflikt + + for række in data["rækker"]: + v = række.get(navn) + if v is None: + continue + s = str(v) + if ucase: + række[navn] = s.upper() + elif lcase: + række[navn] = s.lower() + return data + +def rens(data: dict, file_config: dict, global_config: dict) -> dict: + """Anvender regex- og tegnbaserede renseregler fra global_config på markerede kolonner.""" + if not isinstance(data, dict) or "rækker" not in data: + return data + + rens_regex = global_config.get("rens_intervaller_regex") + rens_erstats = global_config.get("rens_intervaller_erstats", []) + rens_chars = global_config.get("rens_all_chars", {}) + rens_regex_rules = global_config.get("rens_regex_rules", []) + + # ingenting at gøre? + if rens_regex is None and not rens_chars and not rens_regex_rules: + return data + + kolonner = file_config.get("kolonner", []) + + for kolonne in kolonner: + if not kolonne.get("rens", False): + continue + + kolonnenavn = kolonne.get("navn") + for række in data["rækker"]: + værdi = række.get(kolonnenavn) + + if isinstance(værdi, str): + tmp = værdi + + # 1) Frie regex-regler (kører først) + for cre, repl in rens_regex_rules: + # brug match.expand for at understøtte \1 / \g<1> i YAML + tmp = cre.sub(lambda m, r=repl: m.expand(r), tmp) + + # 2) Intervaller (som før) + if rens_regex: + def regex_erstat(match): + for group_name, val in match.groupdict().items(): + if val: + index = int(group_name[1:]) # fx 'i0' → 0 + return rens_erstats[index] + return match.group(0) + tmp = rens_regex.sub(regex_erstat, tmp) + + # 3) Enkelttegn (som før) + for char, erstat in rens_chars.items(): + tmp = tmp.replace(char, erstat) + else: + tmp = værdi + + række[kolonnenavn] = tmp + + return data diff --git a/udpak_semistruktur/transform/convert.py b/udpak_semistruktur/transform/convert.py index e69de29..69df4cc 100644 --- a/udpak_semistruktur/transform/convert.py +++ b/udpak_semistruktur/transform/convert.py @@ -0,0 +1,195 @@ +import re + +from datetime import datetime +from decimal import Decimal, ROUND_HALF_UP +from typing import Any, Optional +from udpak_semistruktur.logger import hent_logger + +logger = hent_logger(__name__) + +def _parse_number(value: Any) -> Decimal: + """ + Konverterer en talværdi til Decimal. + Håndterer EU-format (punktum som tusindtalsseparator, komma som decimal) + samt US-format og rene integers. Kaster ValueError ved ugyldigt format. + """ + + if isinstance(value, (int, float, Decimal)): + return Decimal(str(value)) + + s = str(value).strip() + # Fjern valuta og ikke-tal (men bevar cifre, komma, punktum og minus) + s = re.sub(r'[^\d,.\-]', '', s) + + # Håndter typiske EU-formater + if ',' in s and '.' in s: + # Antag '.' = tusind, ',' = decimal, fx "391.211,75" -> "391211.75" + s = s.replace('.', '').replace(',', '.') + elif ',' in s: + # Kun komma => decimal komma, fx "391211,75" -> "391211.75" + s = s.replace(',', '.') + else: + # Kun punktum eller rene cifre -> int/US-format + pass + + # Tom streng eller bare "-" er ugyldig + if s in ('', '-', '.'): + raise ValueError(f"Ugyldigt talformat: {value!r}") + + return Decimal(s) + +def konverter(data: dict, file_config: dict, global_config: dict) -> dict: + """ + Konverterer kolonneværdier i data til de typer der er angivet i file_config. + Håndterer string, integer, float, decimal, boolean og date. + Rækker med fejl samles i data['fejlede_rækker'] hvis fejl_fil er konfigureret, + ellers kastes en exception. + """ + if not isinstance(data, dict) or "rækker" not in data: + return data + + fejl_fil = global_config.get("fejl_fil_ext", None) + kolonner = file_config.get("kolonner", []) + + nye_rækker = [] + fejlede_rækker = [] + + for række in data["rækker"]: + ny_række = række.copy() + fejl_i_række = False + + for kolonne in kolonner: + field_type = kolonne.get("type", "string") + string_max_len = kolonne.get("max_længde", None) + string_truncate = kolonne.get("truncate", None) + + if field_type == "string" and not string_max_len and not string_truncate: + continue + + kolonnenavn = kolonne.get("navn") + value = ny_række.get(kolonnenavn) + + krav = kolonne.get("påkrævet", False) + + # Hvis værdien er None + if value is None: + if krav: + logger.warning(f"Påkrævet felt '{kolonnenavn}' mangler.") + if fejl_fil: + fejl_i_række = True + break + else: + raise ValueError(f"Påkrævet felt '{kolonnenavn}' mangler.") + else: + continue # spring konvertering over for denne kolonne + + try: + if field_type in ["integer", "biginteger", "bigint"]: + tmp = int(value) + elif field_type in ["float", "decimal"]: + dec = _parse_number(value) + decimal_places = kolonne.get("decimaler", 2) + q = Decimal(10) ** (-decimal_places) # fx 2 -> Decimal('0.01') + dec = dec.quantize(q, rounding=ROUND_HALF_UP) + + if field_type == "float": + tmp = f"{dec:.{decimal_places}f}" + else: + # "decimal" som streng bevaret med præcision + tmp = f"{dec:.{decimal_places}f}" + + elif field_type == "boolean": + tmp = str(value).lower() in ["true", "1", "ja"] + elif field_type == "date": + tmp_value = str(value) # altid str + if '[' in tmp_value and tmp_value.endswith(']'): + tmp_value = tmp_value[:tmp_value.index('[')] + + dato_ind_raw = kolonne.get("dato_ind", global_config.get("dato_ind")) + dato_ud = kolonne.get("dato_ud", global_config.get("dato_ud")) + + # Tillad både string og liste + if isinstance(dato_ind_raw, str): + dato_ind_liste = [dato_ind_raw] + elif isinstance(dato_ind_raw, list): + dato_ind_liste = dato_ind_raw + else: + raise ValueError(f"'dato_ind' skal være streng eller liste, men var: {type(dato_ind_raw)}") + + tmp_dato = None + parse_errors = [] + + for dato_ind in dato_ind_liste: + try: + if "%f" in dato_ind: + if re.search(r'\.\d+', tmp_value): + tmp_value_padded = re.sub( + r'\.(\d{1,6})', + lambda m: '.' + m.group(1).ljust(6, '0'), + tmp_value + ) + else: + dato_ind = dato_ind.replace(".%f", "") + tmp_value_padded = tmp_value + else: + tmp_value_padded = tmp_value + + # Fjern kolon i tidszonedelen: +03:00 → +0300, hvis %z bruges + if "%z" in dato_ind: + tmp_value_padded = re.sub(r'([+-]\d{2}):(\d{2})$', r'\1\2', tmp_value_padded) + + tmp_dato = datetime.strptime(tmp_value_padded, dato_ind) + break # succes! + except ValueError as e: + parse_errors.append(f" - Format: {dato_ind} -> {e}") + + if tmp_dato is None: + fejlbesked = "\n".join(parse_errors) + raise ValueError(f"Kunne ikke parse dato '{tmp_value}' med nogen af formaterne:\n{fejlbesked}") + + # Output-format + if dato_ud.upper().strip() == "SYBASE": + tmp = tmp_dato.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + elif dato_ud.upper().strip() == "DATE": + tmp = tmp_dato.strftime('%Y-%m-%d') + elif dato_ud.upper().strip() == "INFORMATICA_US": + tmp = tmp_dato.strftime('%m/%d/%Y %H:%M:%S.%f') + else: + tmp = tmp_dato.strftime(dato_ud) + + + elif field_type == "string": + value = str(value) + if string_max_len and len(value) > string_max_len: + tmp = value[:string_max_len - 3] + "..." + elif string_truncate and len(value) > string_truncate: + tmp = value[:string_truncate] + else: + tmp = value + elif field_type in ["hash", "id", "file", "rod_variant"]: + tmp = value + else: + raise ValueError(f"Ukendt datatype '{field_type}' for feltet '{kolonnenavn}'.") + + ny_række[kolonnenavn] = tmp + + except (ValueError, TypeError) as e: + logger.error(f"[CONVERT]Fejl ved konvertering af felt '{kolonnenavn}' med værdi '{value}': {e}") + if fejl_fil: + fejl_i_række = True + break # Stop konvertering af denne række + else: + raise e + + if fejl_i_række: + fejlede_rækker.append(række) # Tilføj original række + else: + nye_rækker.append(ny_række) + + # Overskriv med gyldige rækker + data["rækker"] = nye_rækker + + if fejl_fil: + data["fejlede_rækker"] = fejlede_rækker + + return data diff --git a/udpak_semistruktur/transform/reshape.py b/udpak_semistruktur/transform/reshape.py index e69de29..a529901 100644 --- a/udpak_semistruktur/transform/reshape.py +++ b/udpak_semistruktur/transform/reshape.py @@ -0,0 +1,190 @@ +from itertools import product +from typing import Any + +from udpak_semistruktur.logger import hent_logger + +logger = hent_logger(__name__) + +def _opfylder_betingelse(værdi: Any, operator: str, sammenlign_værdi: str) -> bool: + """Evaluerer om værdi opfylder betingelsen defineret af operator og sammenlign_værdi.""" + + if værdi is None: + return False + + # Lidt spidsfindig. Men hvis tal sammenligning skal virke korrekt, så skal man bruge tal. - Ellers prøver vi med strenge. + try: + sammenlign = float(sammenlign_værdi) + værdi = float(værdi) + except (ValueError, TypeError): + sammenlign = str(sammenlign_værdi) + værdi = str(værdi) + + if operator == "=": + return værdi == sammenlign + elif operator in ("<>", "!="): + return værdi != sammenlign + elif operator == ">": + return værdi > sammenlign + elif operator == "<": + return værdi < sammenlign + elif operator == ">=": + return værdi >= sammenlign + elif operator == "<=": + return værdi <= sammenlign + return False + +def flatten(data: dict, file_config: dict) -> dict: + """Ekspanderer listede kolonner til separate rækker via kartesisk produkt.""" + + if not isinstance(data, dict) or "rækker" not in data: + return data + + kolonner = file_config.get("kolonner") + flatten_kolonner = [k for k in kolonner if k.get("flatten", False)] + flatten_kolonnenavne = [k.get("navn") for k in flatten_kolonner] + + nye_rækker = [] + + for række in data["rækker"]: + # Saml værdier fra flatten-kolonner + flatten_values = [] + for navn in flatten_kolonnenavne: + værdi = række.get(navn) + if isinstance(værdi, list): + flatten_values.append(værdi) + else: + flatten_values.append([værdi]) # Gør til liste for at indgå i produkt + + # Alle kombinationer + for kombi in product(*flatten_values): + ny_række = {k: v for k, v in række.items() if k not in flatten_kolonnenavne} + ny_række.update(dict(zip(flatten_kolonnenavne, kombi))) + nye_rækker.append(ny_række) + + data["rækker"] = nye_rækker + return data + +def join(data: dict, file_config: dict) -> dict: + """Samler listeværdier i kolonner til en enkelt streng med separator.""" + + if not isinstance(data, dict) or "rækker" not in data: + return data + + kolonner = file_config["kolonner"] + for kolonne in kolonner: + join_kolonne = kolonne.get("join", False) + if join_kolonne: + separator = kolonne.get("join_separator", '|') + + for række in data['rækker']: + værdi = række.get(kolonne['navn']) + if isinstance(værdi, list): + tmp = separator.join(str(v) for v in værdi) + else: + tmp = værdi + + række[kolonne['navn']] = tmp + + return data + +def where(data: dict, file_config: dict, global_config: dict) -> dict: + """Filtrerer rækker baseret på where-betingelser defineret i YAML.""" + + if not isinstance(data, dict) or "rækker" not in data: + return data + + kolonner = file_config.get("kolonner", []) + + for kolonne in kolonner: + if not kolonne.get("where", None): + continue + + condition_streng = kolonne.get("where") + operators = ["<>", "!=", ">=", "<=", "=", ">", "<"] + operator = None + value = None + + for op in operators: + if condition_streng.startswith(op): + operator = op + value = condition_streng[len(op):] + break + + if operator is None: + continue + + kolonnenavn = kolonne.get("navn") + + rækker = [r for r in data["rækker"] if _opfylder_betingelse(r.get(kolonnenavn), operator, value)] + + data["rækker"] = rækker + return data + +def id_felt(data: dict, file_config: dict) -> dict: + """Tildeler auto-incrementerende id-værdier til kolonner af type 'id'.""" + + if not isinstance(data, dict) or "rækker" not in data: + return data + + kolonner = file_config.get("kolonner", []) + + for kolonne in kolonner: + felt_type = kolonne.get("type", None) + if felt_type == 'id': + start = kolonne.get("startværdi", 1) + forøgelse = kolonne.get("forøgelse", 1) + navn = kolonne.get("navn") + + val = start + for række in data['rækker']: + række[navn] = val + val += forøgelse + + return data + +def sammensat_noegle(data: dict, file_config: dict, global_config: dict) -> dict: + """ + Bygger en sammensat nøgle ud fra andre kolonner i samme række. + + YAML på kolonnen: + - type: sammensat_nøgle + - felter: [colA, colB, ...] (kolonnenavne) + - separator: "|" (default: "|") + - allow_empty: false (default: false) + - false => hvis et felt mangler/er tomt => resultat None + - true => tomme felter bliver til "" og joines stadig + """ + if not isinstance(data, dict) or "rækker" not in data: + return data + + kolonner = file_config.get("kolonner", []) + for kol in kolonner: + if str(kol.get("type", "")).lower() != "sammensat_nøgle": + continue + + target = kol.get("navn") + felter = kol.get("felter") or kol.get("fields") + if not target or not isinstance(felter, list) or len(felter) == 0: + raise ValueError("sammensat_nøgle kræver 'navn' og 'felter: [..]' i YAML") + + sep = kol.get("separator", "|") + allow_empty = bool(kol.get("allow_empty", False)) + + for række in data["rækker"]: + parts = [] + missing = False + for f in felter: + v = række.get(f) + if v is None or v == "": + if allow_empty: + parts.append("") + else: + missing = True + break + else: + parts.append(str(v)) + + række[target] = None if missing else sep.join(parts) + + return data +