Godt på vej

This commit is contained in:
2026-04-02 23:36:17 +02:00
parent b05f6b8857
commit 18526f8b45
9 changed files with 1567 additions and 0 deletions

View File

@@ -0,0 +1,471 @@
import os
import re
import yaml
import hashlib
from typing import Any
from datetime import datetime
from copy import deepcopy
from udpak_semistruktur.logger import hent_logger
logger = hent_logger(__name__)
# =========================
# Hjælpefunktioner
# =========================
def _resolve_include_path(base_file: str, include_path: str) -> str:
"""Gør relative include-stier relative til YAML-filens mappe."""
if os.path.isabs(include_path):
return include_path
base_dir = os.path.dirname(os.path.abspath(base_file))
return os.path.abspath(os.path.join(base_dir, include_path))
def _deep_merge_dicts(a: dict, b: dict) -> dict:
"""Dyb-merge a <- b. Lister overskrives (undtagen håndteres særskilt i deep_merge_lists)."""
a = a or {}
b = b or {}
out = dict(a)
for k, v in b.items():
if isinstance(v, dict) and isinstance(out.get(k), dict):
out[k] = _deep_merge_dicts(out[k], v)
else:
out[k] = deepcopy(v)
return out
def deep_merge_lists(base_cfg: dict, incoming_cfg: dict) -> dict:
"""
Robust top-level merge a <- b:
- Dicts merges rekursivt.
- For nøglen 'output_filer' APPEND'er vi (liste + liste).
- Øvrige lister overskrives.
Håndterer at en af parterne kan være None.
"""
base_cfg = base_cfg or {}
incoming_cfg = incoming_cfg or {}
if not isinstance(base_cfg, dict) or not isinstance(incoming_cfg, dict):
return deepcopy(incoming_cfg)
out = dict(base_cfg)
for key, value in incoming_cfg.items():
if key == "output_filer":
base_list = out.get("output_filer")
if not isinstance(base_list, list):
base_list = [] if base_list is None else [base_list]
if isinstance(value, list):
out["output_filer"] = base_list + deepcopy(value)
elif value is None:
out["output_filer"] = base_list
else:
out["output_filer"] = base_list + [deepcopy(value)]
else:
existing = out.get(key)
if isinstance(existing, dict) and isinstance(value, dict):
out[key] = deep_merge_lists(existing, value)
else:
out[key] = deepcopy(value)
return out
def _expand_output_groups_new_only(cfg: dict) -> dict:
"""
Kræv den nye grupperede form for 'output_filer':
- hvert element: { navn?, rod, kolonner(list), outputs: [ {type:'fil'|'tabel', ...}, ... ] }
- 'type' kan udelades hvis fil/tabel kan udledes (fil_navn/tabel_navn).
- 'overskrifter' på gruppeniveau bruges som default til fil-børn, hvis de ikke selv sætter det.
Returnerer cfg med flad 'output_filer'.
"""
if "output_filer" not in cfg or not isinstance(cfg["output_filer"], list):
raise ValueError("'output_filer'-sektionen skal være en liste af grupper.")
flattened = []
for idx, group in enumerate(cfg["output_filer"]):
if not isinstance(group, dict):
raise ValueError(f"'output_filer' element #{idx+1} skal være et objekt.")
if "outputs" not in group or not isinstance(group["outputs"], list) or not group["outputs"]:
raise ValueError(f"Gruppe #{idx+1} mangler en ikke-tom 'outputs'-liste.")
# Fælles felter KRÆVES
if "rod" not in group:
raise ValueError(f"Gruppe #{idx+1} mangler 'rod'.")
if "kolonner" not in group or not isinstance(group["kolonner"], list):
raise ValueError(f"Gruppe #{idx+1} mangler 'kolonner' (liste).")
common = {
"rod": group["rod"],
"kolonner": deepcopy(group["kolonner"]),
}
if "hvis_findes" in group:
common["hvis_findes"] = deepcopy(group["hvis_findes"])
group_default_headers = group.get("overskrifter", None) # optional default for filer
for cidx, child in enumerate(group["outputs"]):
if not isinstance(child, dict):
raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} skal være et objekt.")
# Udled type, hvis mulig
ctype = child.get("type")
if ctype is None:
if "fil_navn" in child:
ctype = "fil"
elif "tabel_navn" in child:
ctype = "tabel"
else:
raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} mangler 'type' og kan ikke udledes.")
if ctype not in ("fil", "tabel"):
raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} har ukendt type '{ctype}'.")
merged = _deep_merge_dicts(common, child)
# Sikr præcis én destination
has_file = "fil_navn" in merged
has_table = "tabel_navn" in merged
if has_file and has_table:
raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} må ikke have både 'fil_navn' og 'tabel_navn'.")
if not has_file and not has_table:
raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} skal have enten 'fil_navn' eller 'tabel_navn'.")
# 'overskrifter' gælder kun for filer; brug gruppens default hvis ikke sat
if ctype == "fil":
if "overskrifter" not in merged and group_default_headers is not None:
merged["overskrifter"] = group_default_headers
else:
merged.pop("overskrifter", None)
# Lidt housekeeping
merged["type"] = ctype
if "navn" in group:
merged["_gruppe_navn"] = group["navn"]
flattened.append(merged)
out = dict(cfg)
out["output_filer"] = flattened
return out
# =========================
# Hovedfunktion
# =========================
def valider_yaml(yaml_file_path: str) -> dict:
"""
Validerer YAML-konfigurationen og udvider env-variabler.
Understøtter KUN den nye grupperede model for 'output_filer'.
"""
# 1) Eksistens
if not os.path.exists(yaml_file_path):
raise FileNotFoundError(f"Konfigurationsfilen '{yaml_file_path}' findes ikke.")
# 2) Indlæs base
with open(yaml_file_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f) or {}
# 3) Env-vars før include
config = erstat_env_vars(config)
# 4) Include
combined = {k: v for k, v in config.items() if k != "include"}
included_files = config.get("include", [])
if isinstance(included_files, str):
included_files = [included_files]
for inc in included_files:
resolved = _resolve_include_path(yaml_file_path, inc)
if not os.path.exists(resolved):
raise FileNotFoundError(f"Inkluderet YAML-fil '{inc}' findes ikke!")
with open(resolved, "r", encoding="utf-8") as f:
inc_cfg = yaml.safe_load(f) or {}
inc_cfg = erstat_env_vars(inc_cfg)
combined = deep_merge_lists(combined, inc_cfg)
config = combined
config.pop("include", None)
# 5) Kræv 'config'
if "config" not in config:
raise ValueError("YAML-filen skal indeholde en 'config'-sektion.")
# 6) Env-vars igen (efter merge)
config = erstat_env_vars(config)
global_config = config["config"]
# 7) Rensedata
rens_intervaller = []
rens_intervaller_erstats = []
rens_chars = {}
rens_cfg = config.get("rens", {}) or {}
for group_name, group_items in rens_cfg.items():
if group_name == "regex":
continue
if not isinstance(group_items, list):
continue
for item in group_items:
if isinstance(item, dict):
tegn = item.get("tegn")
erstat = item.get("erstat", "")
else:
tegn = item
erstat = ""
if isinstance(tegn, str) and "-" in tegn and len(tegn) >= 3:
rens_intervaller.append(tegn)
rens_intervaller_erstats.append(erstat)
else:
rens_chars[tegn] = erstat
if rens_intervaller:
parts = [f"(?P<i{i}>[{interval}])" for i, interval in enumerate(rens_intervaller)]
rens_regex = re.compile("|".join(parts), flags=re.UNICODE)
else:
rens_regex = None
rens_regex_rules = []
for rx in rens_cfg.get("regex", []) or []:
pattern = rx.get("mønster") or rx.get("monster") or rx.get("pattern")
repl = rx.get("erstat", "")
if pattern:
rens_regex_rules.append((re.compile(pattern, flags=re.UNICODE), repl))
global_config["rens_intervaller_regex"] = rens_regex
global_config["rens_intervaller_erstats"] = rens_intervaller_erstats
global_config["rens_all_chars"] = rens_chars
global_config["rens_regex_rules"] = rens_regex_rules
# 8) Output-sti
if "output_path" in global_config and global_config["output_path"] is not None:
sti = str(global_config["output_path"])
if not sti.endswith(os.sep):
sti += os.sep
global_config["output_path"] = sti
if not os.path.exists(global_config["output_path"]):
raise FileNotFoundError(f"Output-stien '{global_config['output_path']}' findes ikke.")
if not os.access(global_config["output_path"], os.W_OK):
raise PermissionError(f"Output-stien '{global_config['output_path']}' er ikke skrivbar.")
# 9) Dato
if "dato" in global_config and global_config["dato"] is not None:
if "dato_format" not in global_config:
raise ValueError("Hvis 'dato' er angivet i YAML, skal 'dato_format' også være til stede.")
try:
global_config["dato"] = datetime.strptime(str(global_config["dato"]), str(global_config["dato_format"]))
except ValueError:
raise ValueError(
f"Forkert datoformat i config: '{global_config['dato']}'. "
f"Forventet format: '{global_config['dato_format']}'."
)
else:
global_config["dato"] = datetime.now()
# 10) Output-filer (NY model -> flad)
config = _expand_output_groups_new_only(config)
# 11) Basisvalideringer pr. output
if "output_filer" not in config or not isinstance(config["output_filer"], list) or not config["output_filer"]:
raise ValueError("'output_filer' skal være en ikke-tom liste (efter grupper er udvidet).")
for file_cfg in config["output_filer"]:
if not isinstance(file_cfg, dict):
raise ValueError("Hvert output-element skal være et objekt.")
if ("fil_navn" in file_cfg) == ("tabel_navn" in file_cfg):
raise ValueError("Hvert output skal have præcis én af 'fil_navn' eller 'tabel_navn'.")
if "rod" not in file_cfg:
raise ValueError("Hvert output skal have en 'rod'.")
if "kolonner" not in file_cfg or not isinstance(file_cfg["kolonner"], list):
raise ValueError("Hvert output skal have 'kolonner' som en liste.")
if "tabel_navn" in file_cfg:
file_cfg.pop("overskrifter", None) # kun relevant for filer
if "hvis_findes" in file_cfg:
hf = file_cfg["hvis_findes"]
if isinstance(hf, str):
file_cfg["hvis_findes"] = [hf]
hf = file_cfg["hvis_findes"]
if not isinstance(hf, list):
raise ValueError("'hvis_findes' skal være en streng eller en liste af strenge.")
for i, sti in enumerate(hf):
if not isinstance(sti, str) or not sti.strip():
raise ValueError(f"'hvis_findes' element #{i+1} skal være en ikke-tom streng.")
# Hash-validering
hash_fields = [c for c in file_cfg["kolonner"] if isinstance(c, dict) and c.get("type") == "hash"]
if len(hash_fields) > 1:
raise ValueError("Der må maksimalt være én 'hash'-kolonne pr. output.")
if hash_fields:
algo = hash_fields[0].get("hash_algorithm", "sha256")
if algo not in hashlib.algorithms_guaranteed:
raise ValueError(f"Forkert HASH-algoritme: '{algo}'. Gyldige værdier: {', '.join(sorted(hashlib.algorithms_guaranteed))}")
# Kolonne-regler
for col in file_cfg["kolonner"]:
if not isinstance(col, dict):
continue # skabelon-kortform er OK
if col.get("type") == "hash" and "felt" in col:
raise ValueError("En 'hash'-kolonne må ikke have et 'felt'.")
if col.get("type") == "id" and "felt" in col:
raise ValueError("En 'id'-kolonne må ikke have et 'felt'.")
# Udfyld manglende kolonnenavne (ikke for skabelon/hash/id)
for i, col in enumerate(file_cfg["kolonner"]):
if isinstance(col, dict):
if not col.get("navn") and col.get("type") not in ("hash", "id") and "skabelon" not in col:
col["navn"] = f"kolonne_{i+1}"
# Dubletnavne
navne = [c.get("navn") for c in file_cfg["kolonner"] if isinstance(c, dict) and c.get("navn")]
dups = sorted({n for n in navne if navne.count(n) > 1})
if dups:
target = file_cfg.get("fil_navn", file_cfg.get("tabel_navn", "ukendt"))
raise ValueError(f"Kolonnenavne i output '{target}' indeholder dubletter: {', '.join(dups)}")
# 12) Input-krav
if not ("input_fil" in global_config or "input_fil_liste" in global_config):
raise ValueError("Der skal være enten 'input_fil' eller 'input_fil_liste' i 'config'-sektionen.")
if "input_fil" in global_config and "input_fil_liste" in global_config:
raise ValueError("Kun én af 'input_fil' eller 'input_fil_liste' må angives i 'config'.")
# 13) Defaults
global_config.setdefault("miljø", "udv")
global_config.setdefault("skrivetilstand", "w")
global_config.setdefault("encoding", "utf-8")
global_config.setdefault("separator", "\t")
global_config.setdefault("logfil", "log_{yyyy}{mm}{dd}.txt")
global_config.setdefault("debug", False)
global_config.setdefault("dan_ok", True)
global_config.setdefault("global_rens", False)
global_config.setdefault("global_fjern_linjeskift", False)
global_config.setdefault("global_dato_ind", "%Y-%m-%d")
global_config.setdefault("global_dato_ud", "%Y-%m-%d")
global_config.setdefault("fejl_fil_ext", None)
global_config.setdefault("stop_ved_0_output", False)
global_config.setdefault("db_char_set", "latin-1")
# 14) Logfilnavn formateres med dato
global_config["logfil"] = str(global_config["logfil"]).format(
yy=global_config["dato"].strftime("%y"),
yyyy=global_config["dato"].strftime("%Y"),
mm=global_config["dato"].strftime("%m"),
dd=global_config["dato"].strftime("%d"),
)
# 15) Timestamp
global_config["var_timestamp"] = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")
# 16) Debug
if global_config.get("debug") is True:
logger.debug("YAML-konfigurationen er gyldig.")
# 17) Udvid kolonne-skabeloner (din eksisterende funktion)
config = udvid_kolonne_skabeloner(config)
return config
def erstat_env_vars(config: Any) -> Any:
"""Erstatter miljøvariabler på formen ${VAR} rekursivt i config-strukturen."""
if isinstance(config, dict):
return {key: erstat_env_vars(value) for key, value in config.items()}
elif isinstance(config, list):
return [erstat_env_vars(item) for item in config]
elif isinstance(config, str):
# Find og erstat alle miljøvariabler
start_index = config.find("${")
while start_index != -1:
end_index = config.find("}", start_index)
if end_index == -1:
break # Ugyldigt format, stop
env_var = config[start_index + 2:end_index] # Udhent miljøvariabelnavn
env_value = os.getenv(env_var, None) # Hent værdi fra miljøet
if env_value is None:
raise ValueError(f"Miljøvariabel '{env_var}' er ikke sat, men bruges i config-filen!")
config = config.replace(f"${{{env_var}}}", env_value) # Erstat variablen
start_index = config.find("${") # Søg efter næste variabel
return config
else:
return config # Returnér uændret, hvis det ikke er en streng
def udvid_kolonne_skabeloner(config: dict) -> dict:
"""Ekspanderer kolonne-skabelon-referencer i output_filer til flade kolonner."""
skabeloner = config.get("kolonne_skabeloner", {})
def expand_template(skabelonnavn, prefix="", prefix_felt="", skabelon_rod=""):
if skabelonnavn not in skabeloner:
raise ValueError(f"Skabelon '{skabelonnavn}' findes ikke.")
udvidede = []
for item in skabeloner[skabelonnavn]:
# Tjek om dette element definerer en ny rod
aktuel_rod = item.get("rod", "")
ny_rod = skabelon_rod
if aktuel_rod:
if skabelon_rod and skabelon_rod != aktuel_rod:
raise ValueError(f"Konflikt: To rødder fundet ('{skabelon_rod}' og '{aktuel_rod}') i skabelon '{skabelonnavn}'.")
ny_rod = aktuel_rod
if "skabelon" in item:
under_skabelon = item["skabelon"]
under_prefix = prefix + item.get("prefix", item.get("prefix_navn", ""))
under_prefix_felt = prefix_felt + item.get("prefix_felt", "")
if under_prefix_felt and not under_prefix_felt.endswith("."):
under_prefix_felt += "."
# Giv den fundne rod videre til næste niveau
udvidede.extend(expand_template(under_skabelon, under_prefix, under_prefix_felt, ny_rod))
else:
nyt_item = item.copy()
nyt_item["navn"] = prefix + item["navn"]
if "felt" in nyt_item and nyt_item["felt"]:
nyt_item["felt"] = prefix_felt + item["felt"]
# Sæt roden hvis den findes, ellers fejler vi ikke her (da den kan mangle helt)
if ny_rod:
nyt_item["rod"] = ny_rod
udvidede.append(nyt_item)
return udvidede
for output_fil in config.get("output_filer", []):
nye_kolonner = []
for kol in output_fil.get("kolonner", []):
if "skabelon" in kol:
skabelon_rod = kol.get("rod", "")
prefix = kol.get("prefix", kol.get("prefix_navn", ""))
prefix_felt = kol.get("prefix_felt", "")
if prefix_felt:
if not prefix_felt.endswith("."):
prefix_felt += "."
nye_kolonner.extend(expand_template(
kol["skabelon"],
prefix,
prefix_felt,
skabelon_rod
))
else:
nye_kolonner.append(kol)
output_fil["kolonner"] = nye_kolonner
return config

View File

@@ -0,0 +1,62 @@
import os
import json
import base64
from udpak_semistruktur.logger import hent_logger
logger = hent_logger(__name__)
def læs_json_fil(global_config: dict) -> tuple[str, str, str, str, str]:
"""Henter database-credentials fra pwd.json baseret på miljø i global_config."""
base_path = os.environ.get("PMROOTDIR")
if not base_path:
logger.error("Miljøvariablen 'PMROOTDIR' er ikke sat.")
raise EnvironmentError("Miljøvariablen 'PMROOTDIR' er ikke sat.")
if global_config["miljø"].lower() == "prd":
env = "BASE_PROD"
host = "sdpaseprdbase.ccta.dk"
port = "7001"
elif global_config["miljø"].lower() == "pre":
env = "BASE_PRE"
host = "sdpasepredb01.ccta.dk"
port = "7101"
elif global_config["miljø"].lower() == "udv":
env = "BASE_UDV"
host = "sdpaseudvdb01.ccta.dk"
port = "7301"
else:
logger.error(f"Ukendt database-miljø: '{global_config['miljø']}'.")
raise ValueError(f"Ukendt database-miljø: '{global_config['miljø']}'.")
filnavn = os.path.join(base_path, "tools", "nogler", "pwd.json")
brugernavn = ""
password = ""
try:
with open(filnavn, 'r', encoding='utf-8') as fil:
data = json.load(fil)
for nøgle, værdier in data.items():
if nøgle == env:
if len(værdier) >= 2:
brugernavn = værdier[0]
password = base64.b64decode(værdier[1]).decode("utf-8")
else:
logger.error("Fejl i data, ikke nok værdier!")
raise ValueError(f"Fejl i credentials-fil: ikke nok værdier for '{env}'.")
except FileNotFoundError:
logger.error(f"Credentials-filen '{filnavn}' blev ikke fundet.")
raise
except json.JSONDecodeError as e:
logger.error(f"Fejl ved indlæsning af JSON fra '{filnavn}': {e}")
raise
if brugernavn == "":
raise ValueError("Kunne ikke finde brugernavn og adgangskode i credentials-filen.")
return brugernavn, password, env, host, port

View File

@@ -0,0 +1,129 @@
import os
import json
import codecs
import re
import xmltodict
from pathlib import Path
from typing import Any, Generator, Optional
from charset_normalizer import from_path
from udpak_semistruktur.logger import hent_logger
logger = hent_logger(__name__)
def detect_file_info(file_path: str) -> tuple[str, str]:
"""
Returnerer (file_type, encoding).
file_type ∈ {'xml','json','ukendt'}.
Robust mod BOM og UTF-16/32.
"""
# --- filendelse som hint ---
ext = os.path.splitext(file_path)[1].lower()
ext_hint = "xml" if ext == ".xml" else "json" if ext == ".json" else None
# --- sniff bytes ---
with open(file_path, "rb") as f:
data = f.read(4096)
if not data:
# tom fil → ukendt, men behold evt. hint
enc_guess = "utf-8"
return (ext_hint or "ukendt", enc_guess)
# strip BOMs (for at gøre type-sniff lettere)
for bom in (codecs.BOM_UTF8, codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE,
codecs.BOM_UTF32_LE, codecs.BOM_UTF32_BE):
if data.startswith(bom):
data = data[len(bom):]
break
# tillad nulbytes imellem tegn (UTF-16/32)
def spaced_bytes(s: bytes) -> bytes:
return b"".join(re.escape(bytes([b])) + b"\x00*" for b in s)
xml_patterns = [
re.compile(spaced_bytes(b"<?xml"), re.IGNORECASE),
re.compile(spaced_bytes(b"<!--"), re.IGNORECASE),
re.compile(spaced_bytes(b"<"), re.IGNORECASE),
]
json_patterns = [
re.compile(spaced_bytes(b"{")),
re.compile(spaced_bytes(b"[")),
]
data_l = re.sub(br"^[\x00\s]+", b"", data)
is_xml = any(p.search(data_l) for p in xml_patterns)
is_json = any(p.search(data_l) for p in json_patterns)
if is_xml and not is_json:
ftype = "xml"
elif is_json and not is_xml:
ftype = "json"
elif is_xml and is_json:
ftype = ext_hint or "ukendt"
else:
ftype = ext_hint or "ukendt"
# --- encoding-detect (charset-normalizer) ---
res = from_path(file_path).best()
enc = res.encoding if res else "utf-8"
logger.debug(f"Detekteret type={ftype}, encoding={enc} for {os.path.basename(file_path)}")
return (ftype, enc)
def læs_filer(
config: dict,
input_fil: Optional[str] = None,
input_fil_liste: Optional[str] = None,
keep_prefixes: bool = True,
) -> Generator[Any, None, None]:
"""
Generator der læser én eller flere filer:
- Sniffer type+encoding (detect_file_info)
- Åbner med korrekt encoding (fallback ved decode-fejl)
- Parser JSON/XML (xmltodict)
- keep_prefixes=True: bevar 'cm:Tag' (anbefalet til din præfiks-fallback)
keep_prefixes=False: brug process_namespaces=True → '{uri}LocalName'
"""
stop_file_path = os.path.join(config["output_path"], "stop.txt")
def load_file(file_path):
file_type, encoding = detect_file_info(file_path)
logger.info(f"Læser filen: {file_path} (type={file_type}, encoding={encoding})")
config["current_file"] = os.path.basename(file_path)
# Læs med detekteret encoding; fallback hvis nødvendigt
try:
with open(file_path, 'r', encoding=encoding) as f:
content = f.read()
except UnicodeDecodeError:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
if file_type == "json":
return json.loads(content)
elif file_type == "xml":
if keep_prefixes:
# bevar 'cm:Tag' nøgler → passer til din eksisterende prefix-fallback
return xmltodict.parse(content)
else:
# brug '{uri}LocalName' nøgler (kræver din URI-matchlogik)
return xmltodict.parse(content, process_namespaces=True)
else:
raise ValueError(f"Ukendt eller ikke-understøttet filtype: {file_path}")
if input_fil:
yield load_file(input_fil)
elif input_fil_liste:
logger.info("Læser fillisten: " + input_fil_liste)
with open(input_fil_liste, 'r', encoding="utf-8") as f:
for file_name in f.read().splitlines():
fil = Path(stop_file_path)
if fil.is_file():
logger.info("Stop-fil fundet afbryder fillæsning.")
return
if not file_name.strip():
continue
yield load_file(file_name)

View File

@@ -0,0 +1,217 @@
from typing import Any, Generator, Union, List
from udpak_semistruktur.logger import hent_logger
logger = hent_logger(__name__)
def _extract_text_node(v: Any) -> Any:
"""
Udtrækker ren tekstværdi fra et xmltodict-node hvis muligt.
Har noden '#text' og ellers kun '@...'-nøgler, returneres v['#text'].
Ellers returneres v uændret.
"""
if isinstance(v, dict) and "#text" in v:
other_keys = [k for k in v.keys() if k != "#text"]
if all(k.startswith("@") for k in other_keys):
return v["#text"]
return v
def _resolve_with_indices(obj: Any, path: str, sti_index: dict, base_path: str = "") -> Any:
"""
Følger en dot-sti og bruger sti_index til at vælge elementer
hver gang vi rammer en liste. base_path er den allerede-resolverede
sti (brug den når du starter nede i en 'rod').
Eksempler på sti_index:
{"orders": 1, "orders.items": 0}
"""
if path in (None, "", "."):
return obj
parts = [p for p in path.split(".") if p != ""]
cur = obj
acc = base_path.strip(".")
for key in parts:
# Hvis vi står på en liste, vælg det aktuelle index for den akkumulerede sti
if isinstance(cur, list):
idx_key = acc
idx = sti_index.get(idx_key, 0)
try:
cur = cur[idx]
except (TypeError, IndexError):
return None
if key == "#text":
if isinstance(cur, dict):
cur = cur.get("#text")
acc = f"{acc}.#text" if acc else "#text"
continue
else:
return cur
# Slå næste nøgle op i dict med prefix-fallback
if isinstance(cur, dict):
match = find_nøgle_med_prefix_fallback(cur, key)
if match is None:
return None
cur = cur.get(match)
acc = match if not acc else f"{acc}.{match}"
else:
return None
# Hvis vi ender på en liste, vælg index én sidste gang
if isinstance(cur, list):
idx = sti_index.get(acc, 0)
try:
cur = cur[idx]
except (TypeError, IndexError):
return None
return _extract_text_node(cur)
def sti_findes(obj: Any, sti: str, sti_index: dict) -> bool:
"""
Returnerer True hvis stien kan opløses relativt til obj.
Bruges af 'hvis_findes'.
"""
if not sti:
return False
kandidat = _resolve_with_indices(obj, sti, sti_index, base_path="")
return kandidat is not None
def matcher_hvis_findes(obj: Any, hvis_findes: Any, sti_index: dict) -> bool:
"""
OR-logik:
- hvis 'hvis_findes' ikke er angivet -> True
- hvis listen er tom -> True
- ellers True hvis mindst én sti findes
"""
if not hvis_findes:
return True
if isinstance(hvis_findes, str):
hvis_findes = [hvis_findes]
return any(sti_findes(obj, sti, sti_index) for sti in hvis_findes)
def find_nøgle_med_prefix_fallback(obj: dict, nøgle: str) -> str | None:
"""Slår en nøgle op i et dict med fallback til namespace-præfiks (fx 'cm:Navn')."""
if nøgle in obj:
return nøgle
for key in obj:
if ":" in key and key.split(":")[-1] == nøgle:
return key
return None
def rekursiv_udpakning(obj, sti: Union[str, List[str]], base_path="", path_index=None):
if sti in [None, "", [], "."]:
yield obj, path_index or {}
return
# Ny: rod="*" → iterér værdier i dict (og bevar nøglen i sti_index["__key"])
if sti in ("*", ".*"):
if isinstance(obj, dict):
for k, v in obj.items():
ny_index = (path_index or {}).copy()
ny_index["__key"] = k # så @key virker
yield v, ny_index
elif isinstance(obj, list):
for i, v in enumerate(obj):
ny_index = (path_index or {}).copy()
ny_index[(base_path or "").rstrip(".")] = i
yield v, ny_index
else:
# ikke-liste/dict → bare returnér objektet som er
yield obj, path_index or {}
return
if isinstance(sti, str):
sti_dele = sti.split(".")
else:
sti_dele = sti
if not sti_dele:
yield obj, path_index or {}
return
nøgle = sti_dele[0]
resten = sti_dele[1:]
# NYT: wildcard-sti-segment
if nøgle == "*":
if isinstance(obj, dict):
for k, v in obj.items():
ny_index = (path_index or {}).copy()
ny_index["__key"] = k # så @key virker
# base_path + k + "." afspejler at vi "går ned" i dict'en
yield from rekursiv_udpakning(v, resten, base_path + k + ".", ny_index)
elif isinstance(obj, list):
for idx, v in enumerate(obj):
ny_index = (path_index or {}).copy()
ny_index[base_path.rstrip(".")] = idx
yield from rekursiv_udpakning(v, resten, base_path, ny_index)
return
if isinstance(obj, dict):
matchende_nøgle = find_nøgle_med_prefix_fallback(obj, nøgle)
if matchende_nøgle is None:
return
næste = obj.get(matchende_nøgle)
ny_path_index = (path_index or {}).copy()
ny_path_index[(base_path + matchende_nøgle).strip(".")] = 0
if isinstance(næste, list):
for idx, element in enumerate(næste):
ny_path_index = (path_index or {}).copy()
ny_path_index[base_path + matchende_nøgle] = idx
yield from rekursiv_udpakning(
element, resten, base_path + matchende_nøgle + ".", ny_path_index
)
else:
yield from rekursiv_udpakning(
næste, resten, base_path + matchende_nøgle + ".", path_index
)
elif isinstance(obj, list):
for idx, item in enumerate(obj):
ny_path_index = (path_index or {}).copy()
ny_path_index[base_path.rstrip(".")] = idx
yield from rekursiv_udpakning(
item, sti_dele, base_path, ny_path_index
)
def hent_objekt_fra_sti(root_obj: Any, sti: str, path_index: dict) -> Any:
"""Returnerer objektet ved den angivne dot-sti relativt til root_obj."""
return _resolve_with_indices(root_obj, sti, path_index, base_path="")
def hent_fra_objekt_med_prefix_fallback(obj: Any, nøgle: str) -> Any:
"""Slår nøgle op i obj med namespace-præfiks fallback og returnerer tekstnoden."""
if not isinstance(obj, dict):
return None
# 1. Direkte opslag
if nøgle in obj:
return _extract_text_node(obj[nøgle])
# 2. Hvis vi har 'CountryCode', så prøv fx 'c:CountryCode', 'cm:CountryCode'
for key in obj.keys():
if ":" in key and key.split(":")[-1] == nøgle:
return _extract_text_node(obj[key])
return None
def hent_felt(obj: Any, sti: str) -> Any:
"""Følger en dot-sti i obj uden sti_index bruges til simple opslag."""
dele = sti.split('.')
for del_navn in dele:
if isinstance(obj, list):
return [hent_felt(item, '.'.join(dele[dele.index(del_navn):])) for item in obj]
elif isinstance(obj, dict):
obj = hent_fra_objekt_med_prefix_fallback(obj, del_navn)
else:
return None
return obj

View File

@@ -0,0 +1,99 @@
import pyodbc
from typing import Any
from udpak_semistruktur.logger import hent_logger
logger = hent_logger(__name__)
def insert_rows_ase(
conn: pyodbc.Connection,
table_name: str,
columns: list[str],
rows: list[dict],
batch_size: int = 1000,
use_fast_executemany: bool = True,
) -> tuple[int, list[dict]]:
"""
Indsætter rows i ASE-tabellen i batches.
Returnerer (indsatte_antal, db_fejl_rows) hvor db_fejl_rows er liste af dicts + '_db_error'.
- Bruger fast_executemany hvis ønsket.
- Ved batch-fejl ruller den tilbage og prøver enkeltvis for at isolere fejlrækker.
"""
if not rows:
return 0, []
# ASE tåler som regel ukvoterede kolonnenavne, men har I specialtegn/uppercases: brug "Col"
col_list = ", ".join(columns)
placeholders = ", ".join(["?"] * len(columns))
sql = f"INSERT INTO {table_name} ({col_list}) VALUES ({placeholders})"
logger.debug(f"INSERT til {table_name}: {len(rows)} rækker, batch_size={batch_size}")
cur = conn.cursor()
if use_fast_executemany:
# I jeres tests gav det værdi—så sætter vi det
cur.fast_executemany = True
params = [[r.get(c) for c in columns] for r in rows]
inserted = 0
failed = []
try:
for i in range(0, len(params), batch_size):
chunk = params[i:i+batch_size]
try:
cur.executemany(sql, chunk)
inserted += len(chunk)
conn.commit()
except Exception:
logger.warning(f"Batch fejlede ved offset {i} prøver enkeltvis")
# Batch fejlede find de enkelte fejlrækker
conn.rollback()
for off, one in enumerate(chunk):
try:
cur.execute(sql, one)
inserted += 1
except Exception as e_row:
# Bevar original data og tilføj fejl
bad = dict(rows[i + off])
bad["_db_error"] = str(e_row)
failed.append(bad)
conn.commit()
return inserted, failed
except Exception as e:
logger.error(f"Kritisk fejl ved INSERT til {table_name}: {e}")
# Noget stort gik galt markér alle som fejlet
conn.rollback()
for r in rows:
bad = dict(r)
bad["_db_error"] = str(e)
failed.append(bad)
return inserted, failed
def get_ase_connection_windows(
user: str,
password: str,
host: str,
port: str,
database: str,
autocommit: bool = False,
) -> pyodbc.Connection:
"""Opretter og returnerer en pyodbc-forbindelse til Sybase ASE på Windows."""
conn_str = (
f"DRIVER={{Adaptive Server Enterprise}};"
f"SERVER={host};"
f"PORT={port};"
f"DATABASE={database};"
f"UID={user};"
f"PWD={password};"
)
conn = pyodbc.connect(conn_str, autocommit=autocommit)
cur = conn.cursor()
cur.execute("SET QUOTED_IDENTIFIER ON")
cur.close()
return conn

View File

@@ -0,0 +1,59 @@
import os
import csv
import time
from typing import Any, Callable
from udpak_semistruktur.logger import hent_logger
logger = hent_logger(__name__)
def skriv_fil_med_retry(
skrivefunktion: Callable,
filsti: str,
max_forsøg: int = 10,
ventetider: list[int] = None,
) -> Any:
"""Forsøger at kalde skrivefunktion op til max_forsøg gange med stigende ventetid ved PermissionError."""
sidste_exception = None
if ventetider is None:
ventetider = [2, 5, 10, 15, 20, 30, 40, 50, 60, 100]
for i in range(max_forsøg):
try:
return skrivefunktion()
except PermissionError as e:
delay = ventetider[min(i, len(ventetider)-1)]
logger.warning(f"Fejl ved skrivning til fil '{filsti}' (forsøg {i+1}/{max_forsøg}) (venter: {delay} sekunder): {e}")
sidste_exception = e
time.sleep(delay)
raise sidste_exception
def generer_filer_med_overskrifter(
overskrifter: bool,
output_file: str,
columns: list,
global_config: dict,
) -> None:
"""Opretter output-fil og skriver kolonneoverskrifter hvis konfigureret."""
tilstand = global_config["skrivetilstand"]
enc = global_config["encoding"]
separator = global_config['separator']
logger.debug(f"generer_filer_med_overskrifter kaldt: fil={output_file}, tilstand={tilstand}, enc={enc}")
if tilstand.upper() == 'W':
with open(output_file, 'w', encoding=enc) as out_fil:
if overskrifter:
out_fil.write(separator.join(col["navn"] if "navn" in col else f"Kolonne_{idx+1}" for idx, col in enumerate(columns)) + "\n")
if tilstand.upper() == 'A':
# TODO: out_fil er ikke åbnet i 'A'-tilstanden denne gren virker ikke korrekt
if os.path.exists(output_file):
if os.path.getsize(output_file) == 0:
if overskrifter:
out_fil.write(separator.join(col["navn"] if "navn" in col else f"Kolonne_{idx+1}" for idx, col in enumerate(columns)) + "\n")

View File

@@ -0,0 +1,145 @@
from typing import Any
from bs4 import BeautifulSoup
from udpak_semistruktur.logger import hent_logger
logger = hent_logger(__name__)
def fjern_linjeskift(data: dict, file_config: dict, global_config: dict) -> dict:
"""Fjerner linjeskift fra kolonner markeret med fjern_linjeskift: true i YAML."""
if not isinstance(data, dict) or "rækker" not in data:
return data
kolonner = file_config.get("kolonner", [])
for kolonne in kolonner:
if not kolonne.get("fjern_linjeskift", False):
continue
kolonnenavn = kolonne.get("navn")
for række in data["rækker"]:
værdi = række.get(kolonnenavn)
if isinstance(værdi, str):
værdi_ny = værdi.replace("\r\n", "").replace("\n", "").replace("\r", "")
else:
værdi_ny = værdi
række[kolonnenavn] = værdi_ny
return data
def perform_strip(text: Any) -> Any:
"""Fjerner HTML/XML-tags fra en tekststreng via BeautifulSoup."""
if not text or "<" not in text: # Hurtigt tjek om der overhovedet er tags
return text
# Detekter om det er XML/XHTML eller almindelig HTML
parser = "xml" if "<?xml" in text else "lxml"
try:
soup = BeautifulSoup(text, parser)
# Vi bruger \n som separator for at sikre, at <br/> og blok-tags
# (h1, p) ikke mases sammen. strip=True fjerner whitespace i start/slut.
return soup.get_text(separator="\n", strip=True)
except Exception:
# Hvis parseren fejler, returneres den rå tekst (sikkerhedsnet)
return text
def tag_strip(data: dict, file_config: dict, global_config: dict) -> dict:
"""Stripper HTML/XML-tags fra kolonner markeret med strip_tags: true i YAML."""
if not isinstance(data, dict) or "rækker" not in data:
return data
for kolonne in file_config.get("kolonner", []):
navn = kolonne.get("navn")
if not navn:
continue
strip_tags = kolonne.get("strip_tags", False)
for række in data["rækker"]:
v = række.get(navn)
if v is None:
continue
if strip_tags:
# Vi konverterer til string og stripper tags i ét hug
række[navn] = perform_strip(str(v))
else:
# Hvis der ikke skal strippes tags, bevares værdien (som string)
række[navn] = str(v)
return data
def upper_lower(data: dict, file_config: dict, global_config: dict) -> dict:
"""Konverterer kolonneværdier til upper- eller lowercase efter YAML-konfiguration."""
if not isinstance(data, dict) or "rækker" not in data:
return data
for kolonne in file_config.get("kolonner", []):
navn = kolonne.get("navn")
if not navn:
continue
ucase = kolonne.get("ucase", False)
lcase = kolonne.get("lcase", False)
if ucase and lcase:
continue # undgå konflikt
for række in data["rækker"]:
v = række.get(navn)
if v is None:
continue
s = str(v)
if ucase:
række[navn] = s.upper()
elif lcase:
række[navn] = s.lower()
return data
def rens(data: dict, file_config: dict, global_config: dict) -> dict:
"""Anvender regex- og tegnbaserede renseregler fra global_config på markerede kolonner."""
if not isinstance(data, dict) or "rækker" not in data:
return data
rens_regex = global_config.get("rens_intervaller_regex")
rens_erstats = global_config.get("rens_intervaller_erstats", [])
rens_chars = global_config.get("rens_all_chars", {})
rens_regex_rules = global_config.get("rens_regex_rules", [])
# ingenting at gøre?
if rens_regex is None and not rens_chars and not rens_regex_rules:
return data
kolonner = file_config.get("kolonner", [])
for kolonne in kolonner:
if not kolonne.get("rens", False):
continue
kolonnenavn = kolonne.get("navn")
for række in data["rækker"]:
værdi = række.get(kolonnenavn)
if isinstance(værdi, str):
tmp = værdi
# 1) Frie regex-regler (kører først)
for cre, repl in rens_regex_rules:
# brug match.expand for at understøtte \1 / \g<1> i YAML
tmp = cre.sub(lambda m, r=repl: m.expand(r), tmp)
# 2) Intervaller (som før)
if rens_regex:
def regex_erstat(match):
for group_name, val in match.groupdict().items():
if val:
index = int(group_name[1:]) # fx 'i0' → 0
return rens_erstats[index]
return match.group(0)
tmp = rens_regex.sub(regex_erstat, tmp)
# 3) Enkelttegn (som før)
for char, erstat in rens_chars.items():
tmp = tmp.replace(char, erstat)
else:
tmp = værdi
række[kolonnenavn] = tmp
return data

View File

@@ -0,0 +1,195 @@
import re
from datetime import datetime
from decimal import Decimal, ROUND_HALF_UP
from typing import Any, Optional
from udpak_semistruktur.logger import hent_logger
logger = hent_logger(__name__)
def _parse_number(value: Any) -> Decimal:
"""
Konverterer en talværdi til Decimal.
Håndterer EU-format (punktum som tusindtalsseparator, komma som decimal)
samt US-format og rene integers. Kaster ValueError ved ugyldigt format.
"""
if isinstance(value, (int, float, Decimal)):
return Decimal(str(value))
s = str(value).strip()
# Fjern valuta og ikke-tal (men bevar cifre, komma, punktum og minus)
s = re.sub(r'[^\d,.\-]', '', s)
# Håndter typiske EU-formater
if ',' in s and '.' in s:
# Antag '.' = tusind, ',' = decimal, fx "391.211,75" -> "391211.75"
s = s.replace('.', '').replace(',', '.')
elif ',' in s:
# Kun komma => decimal komma, fx "391211,75" -> "391211.75"
s = s.replace(',', '.')
else:
# Kun punktum eller rene cifre -> int/US-format
pass
# Tom streng eller bare "-" er ugyldig
if s in ('', '-', '.'):
raise ValueError(f"Ugyldigt talformat: {value!r}")
return Decimal(s)
def konverter(data: dict, file_config: dict, global_config: dict) -> dict:
"""
Konverterer kolonneværdier i data til de typer der er angivet i file_config.
Håndterer string, integer, float, decimal, boolean og date.
Rækker med fejl samles i data['fejlede_rækker'] hvis fejl_fil er konfigureret,
ellers kastes en exception.
"""
if not isinstance(data, dict) or "rækker" not in data:
return data
fejl_fil = global_config.get("fejl_fil_ext", None)
kolonner = file_config.get("kolonner", [])
nye_rækker = []
fejlede_rækker = []
for række in data["rækker"]:
ny_række = række.copy()
fejl_i_række = False
for kolonne in kolonner:
field_type = kolonne.get("type", "string")
string_max_len = kolonne.get("max_længde", None)
string_truncate = kolonne.get("truncate", None)
if field_type == "string" and not string_max_len and not string_truncate:
continue
kolonnenavn = kolonne.get("navn")
value = ny_række.get(kolonnenavn)
krav = kolonne.get("påkrævet", False)
# Hvis værdien er None
if value is None:
if krav:
logger.warning(f"Påkrævet felt '{kolonnenavn}' mangler.")
if fejl_fil:
fejl_i_række = True
break
else:
raise ValueError(f"Påkrævet felt '{kolonnenavn}' mangler.")
else:
continue # spring konvertering over for denne kolonne
try:
if field_type in ["integer", "biginteger", "bigint"]:
tmp = int(value)
elif field_type in ["float", "decimal"]:
dec = _parse_number(value)
decimal_places = kolonne.get("decimaler", 2)
q = Decimal(10) ** (-decimal_places) # fx 2 -> Decimal('0.01')
dec = dec.quantize(q, rounding=ROUND_HALF_UP)
if field_type == "float":
tmp = f"{dec:.{decimal_places}f}"
else:
# "decimal" som streng bevaret med præcision
tmp = f"{dec:.{decimal_places}f}"
elif field_type == "boolean":
tmp = str(value).lower() in ["true", "1", "ja"]
elif field_type == "date":
tmp_value = str(value) # altid str
if '[' in tmp_value and tmp_value.endswith(']'):
tmp_value = tmp_value[:tmp_value.index('[')]
dato_ind_raw = kolonne.get("dato_ind", global_config.get("dato_ind"))
dato_ud = kolonne.get("dato_ud", global_config.get("dato_ud"))
# Tillad både string og liste
if isinstance(dato_ind_raw, str):
dato_ind_liste = [dato_ind_raw]
elif isinstance(dato_ind_raw, list):
dato_ind_liste = dato_ind_raw
else:
raise ValueError(f"'dato_ind' skal være streng eller liste, men var: {type(dato_ind_raw)}")
tmp_dato = None
parse_errors = []
for dato_ind in dato_ind_liste:
try:
if "%f" in dato_ind:
if re.search(r'\.\d+', tmp_value):
tmp_value_padded = re.sub(
r'\.(\d{1,6})',
lambda m: '.' + m.group(1).ljust(6, '0'),
tmp_value
)
else:
dato_ind = dato_ind.replace(".%f", "")
tmp_value_padded = tmp_value
else:
tmp_value_padded = tmp_value
# Fjern kolon i tidszonedelen: +03:00 → +0300, hvis %z bruges
if "%z" in dato_ind:
tmp_value_padded = re.sub(r'([+-]\d{2}):(\d{2})$', r'\1\2', tmp_value_padded)
tmp_dato = datetime.strptime(tmp_value_padded, dato_ind)
break # succes!
except ValueError as e:
parse_errors.append(f" - Format: {dato_ind} -> {e}")
if tmp_dato is None:
fejlbesked = "\n".join(parse_errors)
raise ValueError(f"Kunne ikke parse dato '{tmp_value}' med nogen af formaterne:\n{fejlbesked}")
# Output-format
if dato_ud.upper().strip() == "SYBASE":
tmp = tmp_dato.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
elif dato_ud.upper().strip() == "DATE":
tmp = tmp_dato.strftime('%Y-%m-%d')
elif dato_ud.upper().strip() == "INFORMATICA_US":
tmp = tmp_dato.strftime('%m/%d/%Y %H:%M:%S.%f')
else:
tmp = tmp_dato.strftime(dato_ud)
elif field_type == "string":
value = str(value)
if string_max_len and len(value) > string_max_len:
tmp = value[:string_max_len - 3] + "..."
elif string_truncate and len(value) > string_truncate:
tmp = value[:string_truncate]
else:
tmp = value
elif field_type in ["hash", "id", "file", "rod_variant"]:
tmp = value
else:
raise ValueError(f"Ukendt datatype '{field_type}' for feltet '{kolonnenavn}'.")
ny_række[kolonnenavn] = tmp
except (ValueError, TypeError) as e:
logger.error(f"[CONVERT]Fejl ved konvertering af felt '{kolonnenavn}' med værdi '{value}': {e}")
if fejl_fil:
fejl_i_række = True
break # Stop konvertering af denne række
else:
raise e
if fejl_i_række:
fejlede_rækker.append(række) # Tilføj original række
else:
nye_rækker.append(ny_række)
# Overskriv med gyldige rækker
data["rækker"] = nye_rækker
if fejl_fil:
data["fejlede_rækker"] = fejlede_rækker
return data

View File

@@ -0,0 +1,190 @@
from itertools import product
from typing import Any
from udpak_semistruktur.logger import hent_logger
logger = hent_logger(__name__)
def _opfylder_betingelse(værdi: Any, operator: str, sammenlign_værdi: str) -> bool:
"""Evaluerer om værdi opfylder betingelsen defineret af operator og sammenlign_værdi."""
if værdi is None:
return False
# Lidt spidsfindig. Men hvis tal sammenligning skal virke korrekt, så skal man bruge tal. - Ellers prøver vi med strenge.
try:
sammenlign = float(sammenlign_værdi)
værdi = float(værdi)
except (ValueError, TypeError):
sammenlign = str(sammenlign_værdi)
værdi = str(værdi)
if operator == "=":
return værdi == sammenlign
elif operator in ("<>", "!="):
return værdi != sammenlign
elif operator == ">":
return værdi > sammenlign
elif operator == "<":
return værdi < sammenlign
elif operator == ">=":
return værdi >= sammenlign
elif operator == "<=":
return værdi <= sammenlign
return False
def flatten(data: dict, file_config: dict) -> dict:
"""Ekspanderer listede kolonner til separate rækker via kartesisk produkt."""
if not isinstance(data, dict) or "rækker" not in data:
return data
kolonner = file_config.get("kolonner")
flatten_kolonner = [k for k in kolonner if k.get("flatten", False)]
flatten_kolonnenavne = [k.get("navn") for k in flatten_kolonner]
nye_rækker = []
for række in data["rækker"]:
# Saml værdier fra flatten-kolonner
flatten_values = []
for navn in flatten_kolonnenavne:
værdi = række.get(navn)
if isinstance(værdi, list):
flatten_values.append(værdi)
else:
flatten_values.append([værdi]) # Gør til liste for at indgå i produkt
# Alle kombinationer
for kombi in product(*flatten_values):
ny_række = {k: v for k, v in række.items() if k not in flatten_kolonnenavne}
ny_række.update(dict(zip(flatten_kolonnenavne, kombi)))
nye_rækker.append(ny_række)
data["rækker"] = nye_rækker
return data
def join(data: dict, file_config: dict) -> dict:
"""Samler listeværdier i kolonner til en enkelt streng med separator."""
if not isinstance(data, dict) or "rækker" not in data:
return data
kolonner = file_config["kolonner"]
for kolonne in kolonner:
join_kolonne = kolonne.get("join", False)
if join_kolonne:
separator = kolonne.get("join_separator", '|')
for række in data['rækker']:
værdi = række.get(kolonne['navn'])
if isinstance(værdi, list):
tmp = separator.join(str(v) for v in værdi)
else:
tmp = værdi
række[kolonne['navn']] = tmp
return data
def where(data: dict, file_config: dict, global_config: dict) -> dict:
"""Filtrerer rækker baseret på where-betingelser defineret i YAML."""
if not isinstance(data, dict) or "rækker" not in data:
return data
kolonner = file_config.get("kolonner", [])
for kolonne in kolonner:
if not kolonne.get("where", None):
continue
condition_streng = kolonne.get("where")
operators = ["<>", "!=", ">=", "<=", "=", ">", "<"]
operator = None
value = None
for op in operators:
if condition_streng.startswith(op):
operator = op
value = condition_streng[len(op):]
break
if operator is None:
continue
kolonnenavn = kolonne.get("navn")
rækker = [r for r in data["rækker"] if _opfylder_betingelse(r.get(kolonnenavn), operator, value)]
data["rækker"] = rækker
return data
def id_felt(data: dict, file_config: dict) -> dict:
"""Tildeler auto-incrementerende id-værdier til kolonner af type 'id'."""
if not isinstance(data, dict) or "rækker" not in data:
return data
kolonner = file_config.get("kolonner", [])
for kolonne in kolonner:
felt_type = kolonne.get("type", None)
if felt_type == 'id':
start = kolonne.get("startværdi", 1)
forøgelse = kolonne.get("forøgelse", 1)
navn = kolonne.get("navn")
val = start
for række in data['rækker']:
række[navn] = val
val += forøgelse
return data
def sammensat_noegle(data: dict, file_config: dict, global_config: dict) -> dict:
"""
Bygger en sammensat nøgle ud fra andre kolonner i samme række.
YAML på kolonnen:
- type: sammensat_nøgle
- felter: [colA, colB, ...] (kolonnenavne)
- separator: "|" (default: "|")
- allow_empty: false (default: false)
- false => hvis et felt mangler/er tomt => resultat None
- true => tomme felter bliver til "" og joines stadig
"""
if not isinstance(data, dict) or "rækker" not in data:
return data
kolonner = file_config.get("kolonner", [])
for kol in kolonner:
if str(kol.get("type", "")).lower() != "sammensat_nøgle":
continue
target = kol.get("navn")
felter = kol.get("felter") or kol.get("fields")
if not target or not isinstance(felter, list) or len(felter) == 0:
raise ValueError("sammensat_nøgle kræver 'navn' og 'felter: [..]' i YAML")
sep = kol.get("separator", "|")
allow_empty = bool(kol.get("allow_empty", False))
for række in data["rækker"]:
parts = []
missing = False
for f in felter:
v = række.get(f)
if v is None or v == "":
if allow_empty:
parts.append("")
else:
missing = True
break
else:
parts.append(str(v))
række[target] = None if missing else sep.join(parts)
return data