Files
udpak_semistruktur/udpak_semistruktur/extract/extractor.py
2026-04-04 02:54:30 +02:00

192 lines
6.9 KiB
Python

import re
from typing import Any, Optional
from udpak_semistruktur.logger import hent_logger
from udpak_semistruktur.utils import er_tom, evaluer_værdi_token, EMPTY_SENTINELS
from udpak_semistruktur.extract.traversal import (
rekursiv_udpakning,
hent_objekt_fra_sti,
_resolve_with_indices,
matcher_hvis_findes,
_extract_text_node,
)
logger = hent_logger(__name__)
def udvid_rod_alternativer(rod: str) -> list[dict]:
"""Udvider en rod-streng med [a,b,c]-alternativ-syntaks til en liste af rod/variant-dicts."""
if not isinstance(rod, str) or "[" not in rod:
return [{"rod": rod, "variant": None}]
m = re.fullmatch(r"(.*)\[([^\[\]]+)\](.*)", rod)
if not m:
return [{"rod": rod, "variant": None}]
prefix = m.group(1)
choices = [x.strip() for x in m.group(2).split(",") if x.strip()]
suffix = m.group(3)
return [
{
"rod": f"{prefix}{choice}{suffix}",
"variant": choice
}
for choice in choices
]
def hent_fra_spec(spec, default_el, json_data, sti_index, global_config, returner_liste=False) -> tuple[Any, bool]:
"""Henter en værdi fra json_data baseret på en spec-definition. Returnerer (værdi, mangler)."""
if spec is None:
return None, False
if "værdi" in spec and spec.get("værdi") is not None:
return evaluer_værdi_token(spec["værdi"], global_config), False
felt = spec.get("felt")
if felt == "@key":
return (sti_index or {}).get("__key"), False
rod = spec.get("rod")
if felt is None:
return None, True
if felt == ".":
return default_el, False
# Vælg rod/base
if rod:
parent_obj = hent_objekt_fra_sti(json_data, rod, sti_index)
if not isinstance(parent_obj, (dict, list)):
return None, True
kandidat = _resolve_with_indices(parent_obj, felt, sti_index, base_path=rod)
else:
if not isinstance(default_el, (dict, list)):
return None, True
if returner_liste:
# Brug rekursiv_udpakning til at samle alle værdier langs stien
# fx felt="linjer.produkt" giver ["Produkt A", "Produkt B"]
resultater = []
for element, _ in rekursiv_udpakning(default_el, felt):
if element not in EMPTY_SENTINELS:
resultater.append(_extract_text_node(element))
return resultater if resultater else None, False
else:
kandidat = _resolve_with_indices(default_el, felt, sti_index, base_path="")
# Hvis kandidaten er en liste og vi ikke ønsker at returnere hele listen,
# vælg det aktuelle element via sti_index som normalt.
# Hvis returner_liste=True, returneres hele listen uberørt til fx join.
if isinstance(kandidat, list) and not returner_liste:
kandidat = kandidat[0] if kandidat else None
missing = kandidat is None and not isinstance(default_el, list)
return kandidat, missing
def hent_kolonne_værdi_med_fallback(kol: dict, el: Any, json_data: Any, sti_index: dict, global_config: dict) -> Any:
"""Henter kolonneværdi med støtte for missing_fallback og tom_fallback."""
kolnavn = kol.get("navn")
# 1) Primær kilde
if kol.get("felt") is None:
raw_value = kol.get("værdi", None)
værdi = evaluer_værdi_token(raw_value, global_config) if raw_value is not None else None
missing = raw_value is None
else:
returner_liste = bool(kol.get("join", False) or kol.get("flatten", False))
primær_spec = {"felt": kol["felt"]}
if kol.get("rod"):
primær_spec["rod"] = kol["rod"]
værdi, missing = hent_fra_spec(
primær_spec, el, json_data, sti_index, global_config,
returner_liste=returner_liste
)
# 2) Hvis missing -> prøv missing_fallback
if missing:
mf = kol.get("missing_fallback")
if isinstance(mf, dict):
v2, _ = hent_fra_spec(mf, el, json_data, sti_index, global_config)
if not er_tom(v2):
logger.debug(f"[UDTRÆK][FALLBACK][missing] kolonne={kolnavn} brugte missing_fallback={mf}")
return v2
# 3) Hvis tom -> prøv tom_fallback
if er_tom(værdi):
tf = kol.get("tom_fallback")
if isinstance(tf, dict):
v3, _ = hent_fra_spec(tf, el, json_data, sti_index, global_config)
if not er_tom(v3):
logger.debug(f"[UDTRÆK][FALLBACK][tom] kolonne={kolnavn} brugte tom_fallback={tf}")
return v3
return værdi
def generer_datafil(json_data: Any, yaml_config: dict, global_config: dict) -> dict:
"""
Udtrækker rækker fra json_data baseret på yaml_config.
Returnerer dict med 'header' og 'rækker'.
"""
output_filer = {}
rod_sti = yaml_config["rod"]
kolonner = yaml_config["kolonner"]
hvis_findes = yaml_config.get("hvis_findes")
# Særligt robust for dict-af-dicts ved rod="*"
if rod_sti in ("*", ".*") and isinstance(json_data, dict):
# Gem nøglen i sti_index["__key"] så @key kan bruges
objekter = [(v, {"__key": k}) for k, v in json_data.items()]
else:
objekter = []
for rv in udvid_rod_alternativer(rod_sti):
for element, sti_index in rekursiv_udpakning(json_data, rv["rod"]):
ny_sti_index = dict(sti_index or {})
if rv.get("variant") is not None:
ny_sti_index["__rod_variant"] = rv["variant"]
ny_sti_index["__rod_path"] = rv["rod"]
objekter.append((element, ny_sti_index))
rækker = []
header = [k["navn"] for k in kolonner]
for element, sti_index in objekter:
if element in EMPTY_SENTINELS:
continue
elementer = element if isinstance(element, list) else [element]
for el in elementer:
if el in EMPTY_SENTINELS:
continue
if hvis_findes and not matcher_hvis_findes(el, hvis_findes, sti_index):
logger.debug(f"[UDTRÆK][hvis_findes] springer over record; ingen af stierne findes: {hvis_findes}")
continue
base_række = {}
for kol in kolonner:
navn = kol.get("navn")
ktype = kol.get("type")
if ktype == "rod_variant":
base_række[navn] = (sti_index or {}).get("__rod_variant")
elif ktype == "rod_path":
base_række[navn] = (sti_index or {}).get("__rod_path")
elif kol.get("felt") == ".":
base_række[navn] = el
else:
base_række[navn] = hent_kolonne_værdi_med_fallback(
kol, el, json_data, sti_index, global_config
)
rækker.append(base_række)
output_filer['header'] = header
output_filer['rækker'] = rækker
return output_filer