Files
udpak_semistruktur/udpak_semistruktur/config.py
2026-04-04 20:45:35 +02:00

475 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import re
import yaml
import hashlib
from typing import Any
from datetime import datetime
from copy import deepcopy
from udpak_semistruktur.logger import hent_logger
logger = hent_logger(__name__)
# =========================
# Hjælpefunktioner
# =========================
def _resolve_include_path(base_file: str, include_path: str) -> str:
"""Gør relative include-stier relative til YAML-filens mappe."""
if os.path.isabs(include_path):
return include_path
base_dir = os.path.dirname(os.path.abspath(base_file))
return os.path.abspath(os.path.join(base_dir, include_path))
def _deep_merge_dicts(a: dict, b: dict) -> dict:
"""Dyb-merge a <- b. Lister overskrives (undtagen håndteres særskilt i deep_merge_lists)."""
a = a or {}
b = b or {}
out = dict(a)
for k, v in b.items():
if isinstance(v, dict) and isinstance(out.get(k), dict):
out[k] = _deep_merge_dicts(out[k], v)
else:
out[k] = deepcopy(v)
return out
def deep_merge_lists(base_cfg: dict, incoming_cfg: dict) -> dict:
"""
Robust top-level merge a <- b:
- Dicts merges rekursivt.
- For nøglen 'output_filer' APPEND'er vi (liste + liste).
- Øvrige lister overskrives.
Håndterer at en af parterne kan være None.
"""
base_cfg = base_cfg or {}
incoming_cfg = incoming_cfg or {}
if not isinstance(base_cfg, dict) or not isinstance(incoming_cfg, dict):
return deepcopy(incoming_cfg)
out = dict(base_cfg)
for key, value in incoming_cfg.items():
if key == "output_filer":
base_list = out.get("output_filer")
if not isinstance(base_list, list):
base_list = [] if base_list is None else [base_list]
if isinstance(value, list):
out["output_filer"] = base_list + deepcopy(value)
elif value is None:
out["output_filer"] = base_list
else:
out["output_filer"] = base_list + [deepcopy(value)]
else:
existing = out.get(key)
if isinstance(existing, dict) and isinstance(value, dict):
out[key] = deep_merge_lists(existing, value)
else:
out[key] = deepcopy(value)
return out
def _expand_output_groups_new_only(cfg: dict) -> dict:
"""
Kræv den nye grupperede form for 'output_filer':
- hvert element: { navn?, rod, kolonner(list), outputs: [ {type:'fil'|'tabel', ...}, ... ] }
- 'type' kan udelades hvis fil/tabel kan udledes (fil_navn/tabel_navn).
- 'overskrifter' på gruppeniveau bruges som default til fil-børn, hvis de ikke selv sætter det.
Returnerer cfg med flad 'output_filer'.
"""
if "output_filer" not in cfg or not isinstance(cfg["output_filer"], list):
raise ValueError("'output_filer'-sektionen skal være en liste af grupper.")
flattened = []
for idx, group in enumerate(cfg["output_filer"]):
if not isinstance(group, dict):
raise ValueError(f"'output_filer' element #{idx+1} skal være et objekt.")
if "outputs" not in group or not isinstance(group["outputs"], list) or not group["outputs"]:
raise ValueError(f"Gruppe #{idx+1} mangler en ikke-tom 'outputs'-liste.")
# Fælles felter KRÆVES
if "rod" not in group:
raise ValueError(f"Gruppe #{idx+1} mangler 'rod'.")
if "kolonner" not in group or not isinstance(group["kolonner"], list):
raise ValueError(f"Gruppe #{idx+1} mangler 'kolonner' (liste).")
common = {
"rod": group["rod"],
"kolonner": deepcopy(group["kolonner"]),
}
if "hvis_findes" in group:
common["hvis_findes"] = deepcopy(group["hvis_findes"])
group_default_headers = group.get("overskrifter", None) # optional default for filer
for cidx, child in enumerate(group["outputs"]):
if not isinstance(child, dict):
raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} skal være et objekt.")
# Udled type, hvis mulig
ctype = child.get("type")
if ctype is None:
if "fil_navn" in child:
ctype = "fil"
elif "tabel_navn" in child:
ctype = "tabel"
elif "tabel" in child and isinstance(child["tabel"], dict):
ctype = "tabel_avanceret"
else:
raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} mangler 'type' og kan ikke udledes.")
if ctype not in ("fil", "tabel", "tabel_avanceret"):
raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} har ukendt type '{ctype}'.")
merged = _deep_merge_dicts(common, child)
# Sikr præcis én destination
has_file = "fil_navn" in merged
has_table = "tabel_navn" in merged
has_avanceret = "tabel" in merged and isinstance(merged.get("tabel"), dict)
if has_file and has_table:
raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} må ikke have både 'fil_navn' og 'tabel_navn'.")
if not has_file and not has_table and not has_avanceret:
raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} skal have enten 'fil_navn', 'tabel_navn' eller 'tabel'.")
# 'overskrifter' gælder kun for filer; brug gruppens default hvis ikke sat
if ctype == "fil":
if "overskrifter" not in merged and group_default_headers is not None:
merged["overskrifter"] = group_default_headers
else:
merged.pop("overskrifter", None)
# Lidt housekeeping
merged["type"] = ctype
if "navn" in group:
merged["_gruppe_navn"] = group["navn"]
flattened.append(merged)
out = dict(cfg)
out["output_filer"] = flattened
return out
# =========================
# Hovedfunktion
# =========================
def valider_yaml(yaml_file_path: str) -> dict:
"""
Validerer YAML-konfigurationen og udvider env-variabler.
Understøtter KUN den nye grupperede model for 'output_filer'.
"""
# 1) Eksistens
if not os.path.exists(yaml_file_path):
raise FileNotFoundError(f"Konfigurationsfilen '{yaml_file_path}' findes ikke.")
# 2) Indlæs base
with open(yaml_file_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f) or {}
# 3) Env-vars før include
config = erstat_env_vars(config)
# 4) Include
combined = {k: v for k, v in config.items() if k != "include"}
included_files = config.get("include", [])
if isinstance(included_files, str):
included_files = [included_files]
for inc in included_files:
resolved = _resolve_include_path(yaml_file_path, inc)
if not os.path.exists(resolved):
raise FileNotFoundError(f"Inkluderet YAML-fil '{inc}' findes ikke!")
with open(resolved, "r", encoding="utf-8") as f:
inc_cfg = yaml.safe_load(f) or {}
inc_cfg = erstat_env_vars(inc_cfg)
combined = deep_merge_lists(combined, inc_cfg)
config = combined
config.pop("include", None)
# 5) Kræv 'config'
if "config" not in config:
raise ValueError("YAML-filen skal indeholde en 'config'-sektion.")
# 6) Env-vars igen (efter merge)
config = erstat_env_vars(config)
global_config = config["config"]
# 7) Rensedata
rens_intervaller = []
rens_intervaller_erstats = []
rens_chars = {}
rens_cfg = config.get("rens", {}) or {}
for group_name, group_items in rens_cfg.items():
if group_name == "regex":
continue
if not isinstance(group_items, list):
continue
for item in group_items:
if isinstance(item, dict):
tegn = item.get("tegn")
erstat = item.get("erstat", "")
else:
tegn = item
erstat = ""
if isinstance(tegn, str) and "-" in tegn and len(tegn) >= 3:
rens_intervaller.append(tegn)
rens_intervaller_erstats.append(erstat)
else:
rens_chars[tegn] = erstat
if rens_intervaller:
parts = [f"(?P<i{i}>[{interval}])" for i, interval in enumerate(rens_intervaller)]
rens_regex = re.compile("|".join(parts), flags=re.UNICODE)
else:
rens_regex = None
rens_regex_rules = []
for rx in rens_cfg.get("regex", []) or []:
pattern = rx.get("mønster") or rx.get("monster") or rx.get("pattern")
repl = rx.get("erstat", "")
if pattern:
rens_regex_rules.append((re.compile(pattern, flags=re.UNICODE), repl))
global_config["rens_intervaller_regex"] = rens_regex
global_config["rens_intervaller_erstats"] = rens_intervaller_erstats
global_config["rens_all_chars"] = rens_chars
global_config["rens_regex_rules"] = rens_regex_rules
# 8) Output-sti
if "output_path" in global_config and global_config["output_path"] is not None:
sti = str(global_config["output_path"])
if not sti.endswith(os.sep):
sti += os.sep
global_config["output_path"] = sti
if not os.path.exists(global_config["output_path"]):
raise FileNotFoundError(f"Output-stien '{global_config['output_path']}' findes ikke.")
if not os.access(global_config["output_path"], os.W_OK):
raise PermissionError(f"Output-stien '{global_config['output_path']}' er ikke skrivbar.")
# 9) Dato
if "dato" in global_config and global_config["dato"] is not None:
if "dato_format" not in global_config:
raise ValueError("Hvis 'dato' er angivet i YAML, skal 'dato_format' også være til stede.")
try:
global_config["dato"] = datetime.strptime(str(global_config["dato"]), str(global_config["dato_format"]))
except ValueError:
raise ValueError(
f"Forkert datoformat i config: '{global_config['dato']}'. "
f"Forventet format: '{global_config['dato_format']}'."
)
else:
global_config["dato"] = datetime.now()
# 10) Output-filer (NY model -> flad)
config = _expand_output_groups_new_only(config)
# 11) Basisvalideringer pr. output
if "output_filer" not in config or not isinstance(config["output_filer"], list) or not config["output_filer"]:
raise ValueError("'output_filer' skal være en ikke-tom liste (efter grupper er udvidet).")
for file_cfg in config["output_filer"]:
if not isinstance(file_cfg, dict):
raise ValueError("Hvert output-element skal være et objekt.")
har_avanceret = isinstance(file_cfg.get("tabel"), dict)
if not har_avanceret:
if ("fil_navn" in file_cfg) == ("tabel_navn" in file_cfg):
raise ValueError("Hvert output skal have præcis én af 'fil_navn' eller 'tabel_navn'.")
if "rod" not in file_cfg:
raise ValueError("Hvert output skal have en 'rod'.")
if "kolonner" not in file_cfg or not isinstance(file_cfg["kolonner"], list):
raise ValueError("Hvert output skal have 'kolonner' som en liste.")
if "tabel_navn" in file_cfg or har_avanceret:
file_cfg.pop("overskrifter", None) # kun relevant for filer
if "hvis_findes" in file_cfg:
hf = file_cfg["hvis_findes"]
if isinstance(hf, str):
file_cfg["hvis_findes"] = [hf]
hf = file_cfg["hvis_findes"]
if not isinstance(hf, list):
raise ValueError("'hvis_findes' skal være en streng eller en liste af strenge.")
for i, sti in enumerate(hf):
if not isinstance(sti, str) or not sti.strip():
raise ValueError(f"'hvis_findes' element #{i+1} skal være en ikke-tom streng.")
# Hash-validering
hash_fields = [c for c in file_cfg["kolonner"] if isinstance(c, dict) and c.get("type") == "hash"]
if len(hash_fields) > 1:
raise ValueError("Der må maksimalt være én 'hash'-kolonne pr. output.")
if hash_fields:
algo = hash_fields[0].get("hash_algorithm", "sha256")
if algo not in hashlib.algorithms_guaranteed:
raise ValueError(f"Forkert HASH-algoritme: '{algo}'. Gyldige værdier: {', '.join(sorted(hashlib.algorithms_guaranteed))}")
# Kolonne-regler
for col in file_cfg["kolonner"]:
if not isinstance(col, dict):
continue # skabelon-kortform er OK
if col.get("type") == "hash" and "felt" in col:
raise ValueError("En 'hash'-kolonne må ikke have et 'felt'.")
if col.get("type") == "id" and "felt" in col:
raise ValueError("En 'id'-kolonne må ikke have et 'felt'.")
# Udfyld manglende kolonnenavne (ikke for skabelon/hash/id)
for i, col in enumerate(file_cfg["kolonner"]):
if isinstance(col, dict):
if not col.get("navn") and col.get("type") not in ("hash", "id") and "skabelon" not in col:
col["navn"] = f"kolonne_{i+1}"
# Dubletnavne
navne = [c.get("navn") for c in file_cfg["kolonner"] if isinstance(c, dict) and c.get("navn")]
dups = sorted({n for n in navne if navne.count(n) > 1})
if dups:
target = file_cfg.get("fil_navn", file_cfg.get("tabel_navn", "ukendt"))
raise ValueError(f"Kolonnenavne i output '{target}' indeholder dubletter: {', '.join(dups)}")
# 12) Input-krav
if not ("input_fil" in global_config or "input_fil_liste" in global_config):
raise ValueError("Der skal være enten 'input_fil' eller 'input_fil_liste' i 'config'-sektionen.")
if "input_fil" in global_config and "input_fil_liste" in global_config:
raise ValueError("Kun én af 'input_fil' eller 'input_fil_liste' må angives i 'config'.")
# 13) Defaults
global_config.setdefault("miljø", "udv")
global_config.setdefault("skrivetilstand", "w")
global_config.setdefault("encoding", "utf-8")
global_config.setdefault("separator", "\t")
global_config.setdefault("logfil", "log_{yyyy}{mm}{dd}.txt")
global_config.setdefault("dan_ok", True)
global_config.setdefault("global_rens", False)
global_config.setdefault("global_fjern_linjeskift", False)
global_config.setdefault("global_dato_ind", "%Y-%m-%d")
global_config.setdefault("global_dato_ud", "%Y-%m-%d")
global_config.setdefault("fejl_fil_ext", None)
global_config.setdefault("stop_ved_0_output", False)
global_config.setdefault("db_char_set", "latin-1")
global_config.setdefault("stop_ved_fejl", False)
# 14) Logfilnavn formateres med dato og placeres i output_path
global_config["logfil"] = os.path.join(
global_config["output_path"],
str(global_config["logfil"]).format(
yy=global_config["dato"].strftime("%y"),
yyyy=global_config["dato"].strftime("%Y"),
mm=global_config["dato"].strftime("%m"),
dd=global_config["dato"].strftime("%d"),
)
)
# 15) Timestamp
global_config["var_timestamp"] = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")
# 17) Udvid kolonne-skabeloner (din eksisterende funktion)
config = udvid_kolonne_skabeloner(config)
return config
def erstat_env_vars(config: Any) -> Any:
"""Erstatter miljøvariabler på formen ${VAR} rekursivt i config-strukturen."""
if isinstance(config, dict):
return {key: erstat_env_vars(value) for key, value in config.items()}
elif isinstance(config, list):
return [erstat_env_vars(item) for item in config]
elif isinstance(config, str):
# Find og erstat alle miljøvariabler
start_index = config.find("${")
while start_index != -1:
end_index = config.find("}", start_index)
if end_index == -1:
break # Ugyldigt format, stop
env_var = config[start_index + 2:end_index] # Udhent miljøvariabelnavn
env_value = os.getenv(env_var, None) # Hent værdi fra miljøet
if env_value is None:
raise ValueError(f"Miljøvariabel '{env_var}' er ikke sat, men bruges i config-filen!")
config = config.replace(f"${{{env_var}}}", env_value) # Erstat variablen
start_index = config.find("${") # Søg efter næste variabel
return config
else:
return config # Returnér uændret, hvis det ikke er en streng
def udvid_kolonne_skabeloner(config: dict) -> dict:
"""Ekspanderer kolonne-skabelon-referencer i output_filer til flade kolonner."""
skabeloner = config.get("kolonne_skabeloner", {})
def expand_template(skabelonnavn, prefix="", prefix_felt="", skabelon_rod=""):
if skabelonnavn not in skabeloner:
raise ValueError(f"Skabelon '{skabelonnavn}' findes ikke.")
udvidede = []
for item in skabeloner[skabelonnavn]:
# Tjek om dette element definerer en ny rod
aktuel_rod = item.get("rod", "")
ny_rod = skabelon_rod
if aktuel_rod:
if skabelon_rod and skabelon_rod != aktuel_rod:
raise ValueError(f"Konflikt: To rødder fundet ('{skabelon_rod}' og '{aktuel_rod}') i skabelon '{skabelonnavn}'.")
ny_rod = aktuel_rod
if "skabelon" in item:
under_skabelon = item["skabelon"]
under_prefix = prefix + item.get("prefix", item.get("prefix_navn", ""))
under_prefix_felt = prefix_felt + item.get("prefix_felt", "")
if under_prefix_felt and not under_prefix_felt.endswith("."):
under_prefix_felt += "."
# Giv den fundne rod videre til næste niveau
udvidede.extend(expand_template(under_skabelon, under_prefix, under_prefix_felt, ny_rod))
else:
nyt_item = item.copy()
nyt_item["navn"] = prefix + item["navn"]
if "felt" in nyt_item and nyt_item["felt"]:
nyt_item["felt"] = prefix_felt + item["felt"]
# Sæt roden hvis den findes, ellers fejler vi ikke her (da den kan mangle helt)
if ny_rod:
nyt_item["rod"] = ny_rod
udvidede.append(nyt_item)
return udvidede
for output_fil in config.get("output_filer", []):
nye_kolonner = []
for kol in output_fil.get("kolonner", []):
if "skabelon" in kol:
skabelon_rod = kol.get("rod", "")
prefix = kol.get("prefix", kol.get("prefix_navn", ""))
prefix_felt = kol.get("prefix_felt", "")
if prefix_felt:
if not prefix_felt.endswith("."):
prefix_felt += "."
nye_kolonner.extend(expand_template(
kol["skabelon"],
prefix,
prefix_felt,
skabelon_rod
))
else:
nye_kolonner.append(kol)
output_fil["kolonner"] = nye_kolonner
return config