XML_til_YAML_DONE
This commit is contained in:
@@ -1 +0,0 @@
|
||||
,carsten,carsten-mint,04.04.2026 02:27,file:///home/carsten/.config/libreoffice/4;
|
||||
@@ -1 +0,0 @@
|
||||
,carsten,carsten-mint,04.04.2026 02:40,file:///home/carsten/.config/libreoffice/4;
|
||||
@@ -1 +0,0 @@
|
||||
,carsten,carsten-mint,04.04.2026 02:53,file:///home/carsten/.config/libreoffice/4;
|
||||
@@ -1 +0,0 @@
|
||||
,carsten,carsten-mint,04.04.2026 15:42,file:///home/carsten/.config/libreoffice/4;
|
||||
34
output.txt
Normal file
34
output.txt
Normal file
@@ -0,0 +1,34 @@
|
||||
Parser XSD: /home/carsten/Dokumenter/GitClone/udpak_semistruktur/tests/bank_test/data/bankdata.xsd
|
||||
|
||||
Fandt 10 komplekse typer:
|
||||
ValutakurserType: 1 felter
|
||||
ValutaType: 3 felter
|
||||
KunderType: 1 felter
|
||||
KundeType: 8 felter
|
||||
AdresseType: 4 felter
|
||||
KontaktType: 2 felter
|
||||
KontiType: 1 felter
|
||||
KontoType: 7 felter
|
||||
PosteringerType: 1 felter
|
||||
PosteringType: 4 felter
|
||||
|
||||
Fandt 5 liste-elementer:
|
||||
bank (bank)
|
||||
valuta (bank.valutakurser.valuta)
|
||||
kunde (bank.kunder.kunde)
|
||||
konto (bank.kunder.kunde.konti.konto)
|
||||
Overliggende: ['kunde']
|
||||
postering (bank.kunder.kunde.konti.konto.posteringer.postering)
|
||||
Overliggende: ['kunde', 'konto']
|
||||
|
||||
Genererer YAML-filer i: /home/carsten/Dokumenter/GitClone/udpak_semistruktur/tests/bank_test/output/DAC8/
|
||||
Skrev: /home/carsten/Dokumenter/GitClone/udpak_semistruktur/tests/bank_test/output/DAC8/skabeloner.yaml
|
||||
Skrev: /home/carsten/Dokumenter/GitClone/udpak_semistruktur/tests/bank_test/output/DAC8/nøgler.yaml
|
||||
Skrev: /home/carsten/Dokumenter/GitClone/udpak_semistruktur/tests/bank_test/output/DAC8/udtræk_bank.yaml
|
||||
Skrev: /home/carsten/Dokumenter/GitClone/udpak_semistruktur/tests/bank_test/output/DAC8/udtræk_valutakurser_valuta.yaml
|
||||
Skrev: /home/carsten/Dokumenter/GitClone/udpak_semistruktur/tests/bank_test/output/DAC8/udtræk_kunder_kunde.yaml
|
||||
Skrev: /home/carsten/Dokumenter/GitClone/udpak_semistruktur/tests/bank_test/output/DAC8/udtræk_kunder_kunde_konti_konto.yaml
|
||||
Skrev: /home/carsten/Dokumenter/GitClone/udpak_semistruktur/tests/bank_test/output/DAC8/udtræk_kunder_kunde_konti_konto_posteringer_postering.yaml
|
||||
Skrev: /home/carsten/Dokumenter/GitClone/udpak_semistruktur/tests/bank_test/output/DAC8/config_bankdata.yaml
|
||||
|
||||
Færdig.
|
||||
111
schema_dataklasser.py
Normal file
111
schema_dataklasser.py
Normal file
@@ -0,0 +1,111 @@
|
||||
from __future__ import annotations
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
# ============================================================
|
||||
# Datastrukturer
|
||||
# ============================================================
|
||||
@dataclass
|
||||
class XsdChoiceElement:
|
||||
element_navn: str
|
||||
rod_sti: str
|
||||
overliggende: list[str]
|
||||
alternativer: list[tuple[str, str, str]] # (alt_navn, skabelon_navn, fuld_sti)
|
||||
|
||||
@dataclass
|
||||
class XsdFelt:
|
||||
"""
|
||||
Repræsenterer ét felt i en complexType – enten et element eller en attribut.
|
||||
|
||||
navn: Feltets navn, fx 'FirstName' eller 'currCode'
|
||||
er_attribut: True hvis det er en XML-attribut (@navn i YAML)
|
||||
xsd_type: Den originale XSD-type, fx 'xsd:string', 'xsd:date'
|
||||
påkrævet: True hvis feltet er obligatorisk
|
||||
er_liste: True hvis feltet kan forekomme flere gange (maxOccurs > 1)
|
||||
er_tekst: True hvis feltet er et simpleContent-element – tekstindholdet
|
||||
er selve værdien og attributterne er metadata (fx MonAmnt_Type)
|
||||
er_choice: True hvis feltet er ét alternativ i en choice
|
||||
"""
|
||||
navn: str
|
||||
er_attribut: bool = False
|
||||
xsd_type: str = "xsd:string"
|
||||
påkrævet: bool = False
|
||||
er_liste: bool = False
|
||||
er_tekst: bool = False
|
||||
er_choice: bool = False
|
||||
xml_navn: str = "" # ← originalt XML-navn, sættes hvis navn omdøbes
|
||||
|
||||
@property
|
||||
def felt_ref(self) -> str:
|
||||
# Brug xml_navn hvis det er sat (dvs. navn er omdøbt)
|
||||
ref_navn = self.xml_navn if self.xml_navn else self.navn
|
||||
return f"@{ref_navn}" if self.er_attribut else ref_navn
|
||||
|
||||
@property
|
||||
def yaml_navn(self) -> str:
|
||||
return self.navn
|
||||
|
||||
|
||||
@dataclass
|
||||
class XsdSkabelonRef:
|
||||
"""
|
||||
Repræsenterer en reference til en anden navngiven complexType.
|
||||
Bliver til '- skabelon: xxx' i YAML.
|
||||
|
||||
element_navn: Elementnavnet i XML, fx 'Name', 'Address'
|
||||
skabelon_navn: Det afledte skabelonnavn, fx 'nameperson', 'address'
|
||||
er_liste: True hvis maxOccurs > 1
|
||||
påkrævet: True hvis minOccurs != 0
|
||||
er_choice: True hvis elementet er ét alternativ i en choice
|
||||
"""
|
||||
element_navn: str
|
||||
skabelon_navn: str
|
||||
er_liste: bool = False
|
||||
påkrævet: bool = False
|
||||
er_choice: bool = False
|
||||
|
||||
@property
|
||||
def prefix_felt(self) -> str:
|
||||
return self.element_navn
|
||||
|
||||
@property
|
||||
def prefix_navn(self) -> str:
|
||||
navn = self.element_navn.lower()
|
||||
return f"{navn.replace('.', '_').replace('-', '_')}_"
|
||||
|
||||
|
||||
@dataclass
|
||||
class XsdKompleksType:
|
||||
"""
|
||||
Repræsenterer en navngiven complexType – bliver til en skabelon.
|
||||
|
||||
navn: Typens navn, fx 'PersonParty_Type'
|
||||
felter: Liste af XsdFelt og/eller XsdSkabelonRef
|
||||
er_ekstern: True hvis typen kommer fra en importeret XSD-fil
|
||||
har_tekst: True hvis typen bruger simpleContent (fx MonAmnt_Type)
|
||||
"""
|
||||
navn: str
|
||||
felter: list[XsdFelt | XsdSkabelonRef] = field(default_factory=list)
|
||||
er_ekstern: bool = False
|
||||
har_tekst: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class XsdListeElement:
|
||||
"""
|
||||
Repræsenterer et maxOccurs > 1 element – bliver til en udtræk-fil.
|
||||
|
||||
element_navn: Elementets navn, fx 'CryptoUsers'
|
||||
type_navn: Navngiven complexType, fx 'CryptoUsers_Type'
|
||||
rod_sti: Fuld dot-separeret sti, fx 'CARF_OECD.CARFBody.CryptoUsers'
|
||||
overliggende: Navne på overliggende liste-elementer
|
||||
felter: Felter fra anonym inline type (kun hvis type_navn er tom)
|
||||
er_simpel: True hvis antal felter < min_felter – foldes ind som join
|
||||
join_i_skabelon: Skabelonnavn der får join-kolonnen
|
||||
"""
|
||||
element_navn: str
|
||||
type_navn: str = ""
|
||||
rod_sti: str = ""
|
||||
overliggende: list[str] = field(default_factory=list)
|
||||
felter: list[XsdFelt | XsdSkabelonRef] = field(default_factory=list)
|
||||
er_simpel: bool = False
|
||||
join_i_skabelon: str = ""
|
||||
625
xsd_til_yaml.py
625
xsd_til_yaml.py
@@ -0,0 +1,625 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
xsd_til_yaml.py
|
||||
|
||||
Genererer YAML-skelet fra en XSD-fil til brug med udpak_semistruktur.
|
||||
|
||||
Producerer:
|
||||
- skabeloner.yaml – feltskabeloner for alle navngivne complexTypes
|
||||
- nøgler.yaml – nøgle-placeholders for overliggende liste-niveauer
|
||||
- udtræk_<navn>.yaml – én fil per maxOccurs > 1 element
|
||||
- config_<navn>.yaml – hoved-config der inkluderer alle filer
|
||||
|
||||
Kør med:
|
||||
python3 xsd_til_yaml.py --config xsd_til_yaml_config.yaml
|
||||
"""
|
||||
|
||||
from email import parser
|
||||
import os
|
||||
import argparse
|
||||
from xml.etree import ElementTree as ET
|
||||
|
||||
from schema_dataklasser import XsdFelt, XsdSkabelonRef, XsdKompleksType, XsdListeElement
|
||||
|
||||
XS_NS = "http://www.w3.org/2001/XMLSchema"
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Fase 1: Hjælpefunktioner
|
||||
# ============================================================
|
||||
def _byg_choice_søskende_index(liste_elementer: list) -> dict[str, list[str]]:
|
||||
"""
|
||||
Returnerer dict: element_navn → liste af rod_stier
|
||||
for elementer der forekommer i flere choice-grene.
|
||||
"""
|
||||
fra_navn: dict[str, list[str]] = {}
|
||||
for el in liste_elementer:
|
||||
fra_navn.setdefault(el.element_navn, []).append(el.rod_sti)
|
||||
# Behold kun dem der har mere end én sti
|
||||
return {navn: stier for navn, stier in fra_navn.items() if len(stier) > 1}
|
||||
|
||||
|
||||
def _find_namespaces(xsd_sti: str) -> dict[str, str]:
|
||||
"""Læser alle namespace-deklarationer fra XSD-filen."""
|
||||
namespaces = {}
|
||||
for event, elem in ET.iterparse(xsd_sti, events=["start-ns"]):
|
||||
prefix, uri = elem
|
||||
namespaces[prefix] = uri
|
||||
return namespaces
|
||||
|
||||
|
||||
def _byg_xs_tag(lokalt_navn: str) -> str:
|
||||
"""Bygger fuldt kvalificeret XS-tag."""
|
||||
return f"{{{XS_NS}}}{lokalt_navn}"
|
||||
|
||||
|
||||
def _strip_prefix(type_navn: str) -> str:
|
||||
"""
|
||||
Fjerner namespace-præfiks fra et typenavn.
|
||||
'carf:PersonParty_Type' → 'PersonParty_Type'
|
||||
'xsd:string' → 'xsd:string' (bevares)
|
||||
"""
|
||||
if not type_navn or ":" not in type_navn:
|
||||
return type_navn
|
||||
prefix, lokalt = type_navn.split(":", 1)
|
||||
if prefix in ("xs", "xsd"):
|
||||
return type_navn
|
||||
return lokalt
|
||||
|
||||
|
||||
def _er_xs_type(type_navn: str) -> bool:
|
||||
"""True hvis typen er en XSD-standardtype."""
|
||||
if not type_navn:
|
||||
return False
|
||||
return type_navn.startswith("xs:") or type_navn.startswith("xsd:")
|
||||
|
||||
|
||||
def _er_liste(max_occ: str) -> bool:
|
||||
"""True hvis maxOccurs > 1."""
|
||||
if max_occ == "unbounded":
|
||||
return True
|
||||
try:
|
||||
return int(max_occ) > 1
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
|
||||
|
||||
def _type_til_skabelon_navn(type_navn: str) -> str:
|
||||
"""
|
||||
'PersonParty_Type' → 'personparty'
|
||||
'carf:Address_Type' → 'address'
|
||||
"""
|
||||
lokalt = _strip_prefix(type_navn)
|
||||
return lokalt.replace("_Type", "").replace("Type", "").lower()
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Fase 2: Parser
|
||||
# ============================================================
|
||||
|
||||
class XsdParser:
|
||||
|
||||
def __init__(self, xsd_sti: str):
|
||||
self.xsd_sti = xsd_sti
|
||||
self.tree = ET.parse(xsd_sti)
|
||||
self.root = self.tree.getroot()
|
||||
self.namespaces = _find_namespaces(xsd_sti)
|
||||
self.ns_url_til_prefix = {v: k for k, v in self.namespaces.items()}
|
||||
self.target_ns = self.root.get("targetNamespace", "")
|
||||
self.komplekse_typer: dict[str, XsdKompleksType] = {}
|
||||
self.liste_elementer: list[XsdListeElement] = []
|
||||
self.choice_elementer: list = [] # ← tilføj denne linje
|
||||
|
||||
def _xs(self, navn: str) -> str:
|
||||
return _byg_xs_tag(navn)
|
||||
|
||||
def _er_intern_type(self, type_navn: str) -> bool:
|
||||
if not type_navn or _er_xs_type(type_navn):
|
||||
return False
|
||||
if ":" not in type_navn:
|
||||
return True
|
||||
prefix = type_navn.split(":")[0]
|
||||
return self.namespaces.get(prefix, "") == self.target_ns
|
||||
|
||||
def parse(self, min_felter: int = 1) -> None:
|
||||
self._registrer_eksterne_typer()
|
||||
self._parse_alle_komplekse_typer()
|
||||
self._find_liste_elementer(min_felter)
|
||||
# Kør skabelon-parsing igen nu vi ved hvilke elementer der har egne filer
|
||||
self._parse_alle_komplekse_typer()
|
||||
self._registrer_choice_elementer()
|
||||
|
||||
def _registrer_eksterne_typer(self) -> None:
|
||||
for import_node in self.root.findall(self._xs("import")):
|
||||
ns_url = import_node.get("namespace", "")
|
||||
schema_sti = import_node.get("schemaLocation", "")
|
||||
if schema_sti:
|
||||
import_sti = os.path.join(
|
||||
os.path.dirname(self.xsd_sti), schema_sti
|
||||
)
|
||||
if os.path.exists(import_sti):
|
||||
self._parse_importeret_fil(import_sti)
|
||||
continue
|
||||
prefix = self.ns_url_til_prefix.get(ns_url, "")
|
||||
print(f" Info: Ekstern namespace '{prefix}' ({ns_url}) – "
|
||||
f"kan ikke læse felter")
|
||||
|
||||
def _parse_importeret_fil(self, sti: str) -> None:
|
||||
try:
|
||||
root = ET.parse(sti).getroot()
|
||||
except Exception as e:
|
||||
print(f" Advarsel: Kunne ikke parse '{sti}': {e}")
|
||||
return
|
||||
|
||||
# Pas 1: registrer alle typenavne
|
||||
for ct in root.findall(self._xs("complexType")):
|
||||
navn = ct.get("name")
|
||||
if navn and navn not in self.komplekse_typer:
|
||||
self.komplekse_typer[navn] = XsdKompleksType(navn=navn, er_ekstern=False)
|
||||
|
||||
# Pas 2: udpak felter – to gange så forward-references løses
|
||||
for _ in range(2):
|
||||
for ct in root.findall(self._xs("complexType")):
|
||||
navn = ct.get("name")
|
||||
if not navn:
|
||||
continue
|
||||
kt = self.komplekse_typer[navn]
|
||||
kt.felter = self._udpak_felter(ct)
|
||||
if ct.find(self._xs("simpleContent")) is not None:
|
||||
kt.har_tekst = True
|
||||
# Omdøb #text til type-navnet
|
||||
type_basis = navn.replace("_Type", "").replace("Type", "").lower()
|
||||
for felt in kt.felter:
|
||||
if isinstance(felt, XsdFelt) and felt.navn == "#text":
|
||||
print(f" DEBUG omdøb: {navn} #text → {type_basis}, xml_navn={felt.xml_navn}")
|
||||
felt.xml_navn = "#text"
|
||||
felt.navn = "value" # ← generisk navn
|
||||
print(f" DEBUG efter: navn={felt.navn} xml_navn={felt.xml_navn} felt_ref={felt.felt_ref}")
|
||||
|
||||
# Rekursive imports
|
||||
for import_node in root.findall(self._xs("import")):
|
||||
schema_sti = import_node.get("schemaLocation", "")
|
||||
if schema_sti:
|
||||
import_sti = os.path.join(os.path.dirname(sti), schema_sti)
|
||||
if os.path.exists(import_sti):
|
||||
self._parse_importeret_fil(import_sti)
|
||||
|
||||
def _parse_alle_komplekse_typer(self) -> None:
|
||||
"""To-pas parsing så forward-references løses."""
|
||||
# Pas 1: registrer alle typenavne
|
||||
for ct in self.root.findall(self._xs("complexType")):
|
||||
navn = ct.get("name")
|
||||
if navn and navn not in self.komplekse_typer:
|
||||
self.komplekse_typer[navn] = XsdKompleksType(navn=navn)
|
||||
|
||||
# Pas 2: udpak felter – to gange så forward-references løses
|
||||
for _ in range(2):
|
||||
for ct in self.root.findall(self._xs("complexType")):
|
||||
navn = ct.get("name")
|
||||
if not navn:
|
||||
continue
|
||||
kt = self.komplekse_typer[navn]
|
||||
kt.felter = self._udpak_felter(ct)
|
||||
if ct.find(self._xs("simpleContent")) is not None:
|
||||
kt.har_tekst = True
|
||||
# Omdøb #text til type-navnet
|
||||
type_basis = navn.replace("_Type", "").replace("Type", "").lower()
|
||||
for felt in kt.felter:
|
||||
if isinstance(felt, XsdFelt) and felt.navn == "#text":
|
||||
print(f" DEBUG omdøb: {navn} #text → {type_basis}, xml_navn={felt.xml_navn}")
|
||||
felt.xml_navn = "#text"
|
||||
felt.navn = "value" # ← generisk navn
|
||||
print(f" DEBUG efter: navn={felt.navn} xml_navn={felt.xml_navn} felt_ref={felt.felt_ref}")
|
||||
|
||||
def _udpak_felter(self, ct_node) -> list:
|
||||
"""Koordinator – detekterer XSD-mønster og kalder rette hjælpefunktion."""
|
||||
simple = ct_node.find(self._xs("simpleContent"))
|
||||
if simple is not None:
|
||||
return self._udpak_simple_content(simple)
|
||||
|
||||
complex_content = ct_node.find(self._xs("complexContent"))
|
||||
if complex_content is not None:
|
||||
return self._udpak_complex_content(complex_content)
|
||||
|
||||
felter = self._udpak_sequence_eller_choice(ct_node)
|
||||
felter.extend(self._udpak_attributter(ct_node))
|
||||
return felter
|
||||
|
||||
def _udpak_attributter(self, node) -> list:
|
||||
felter = []
|
||||
for attr in node.findall(self._xs("attribute")):
|
||||
navn = attr.get("name")
|
||||
if not navn:
|
||||
continue
|
||||
felter.append(XsdFelt(
|
||||
navn=navn,
|
||||
er_attribut=True,
|
||||
xsd_type=attr.get("type", "xsd:string"),
|
||||
påkrævet=attr.get("use") == "required"
|
||||
))
|
||||
return felter
|
||||
|
||||
def _udpak_simple_content(self, simple_node, element_navn: str = "#text") -> list:
|
||||
felter = []
|
||||
ext = simple_node.find(self._xs("extension"))
|
||||
if ext is None:
|
||||
ext = simple_node.find(self._xs("restriction"))
|
||||
if ext is not None:
|
||||
felter.append(XsdFelt(
|
||||
navn=element_navn, # ← brug element_navn i stedet for #text
|
||||
xsd_type=ext.get("base", "xsd:string"),
|
||||
påkrævet=True,
|
||||
er_tekst=True
|
||||
))
|
||||
felter.extend(self._udpak_attributter(ext))
|
||||
return felter
|
||||
|
||||
def _udpak_complex_content(self, complex_node) -> list:
|
||||
"""Arv via extension/restriction."""
|
||||
felter = []
|
||||
node = complex_node.find(self._xs("extension"))
|
||||
if node is None:
|
||||
node = complex_node.find(self._xs("restriction"))
|
||||
if node is None:
|
||||
return felter
|
||||
base_navn = _strip_prefix(node.get("base", ""))
|
||||
if base_navn and base_navn in self.komplekse_typer:
|
||||
felter.extend(self.komplekse_typer[base_navn].felter)
|
||||
felter.extend(self._udpak_sequence_eller_choice(node))
|
||||
felter.extend(self._udpak_attributter(node))
|
||||
return felter
|
||||
|
||||
def _udpak_sequence_eller_choice(self, node, prefix: str = "") -> list:
|
||||
felter = []
|
||||
for seq in node.findall(self._xs("sequence")):
|
||||
felter.extend(self._udpak_container(seq, prefix, er_choice=False))
|
||||
for choice in node.findall(self._xs("choice")):
|
||||
felter.extend(self._udpak_container(choice, prefix, er_choice=True))
|
||||
return felter
|
||||
|
||||
def _udpak_container(self, container, prefix: str = "", er_choice: bool = False) -> list:
|
||||
"""Udpakker elementer fra én sequence eller choice."""
|
||||
felter = []
|
||||
for el in container.findall(self._xs("element")):
|
||||
el_navn = el.get("name")
|
||||
el_type = el.get("type", "")
|
||||
max_occ = el.get("maxOccurs", "1")
|
||||
min_occ = el.get("minOccurs", "1")
|
||||
er_liste = _er_liste(max_occ)
|
||||
påkrævet = min_occ != "0"
|
||||
|
||||
if not el_navn:
|
||||
continue
|
||||
|
||||
fuldt_navn = f"{prefix}.{el_navn}" if prefix else el_navn
|
||||
|
||||
# Navngiven kompleks type → XsdSkabelonRef
|
||||
if el_type and not _er_xs_type(el_type):
|
||||
type_navn_rent = _strip_prefix(el_type)
|
||||
kt = self.komplekse_typer.get(type_navn_rent)
|
||||
if kt and not kt.er_ekstern:
|
||||
felter.append(XsdSkabelonRef(
|
||||
element_navn=fuldt_navn,
|
||||
skabelon_navn=_type_til_skabelon_navn(el_type),
|
||||
er_liste=er_liste,
|
||||
påkrævet=påkrævet,
|
||||
er_choice=er_choice
|
||||
))
|
||||
else:
|
||||
# Ekstern eller ukendt → simpelt felt
|
||||
felter.append(XsdFelt(
|
||||
navn=fuldt_navn,
|
||||
xsd_type=el_type,
|
||||
påkrævet=påkrævet,
|
||||
er_liste=er_liste,
|
||||
er_choice=er_choice
|
||||
))
|
||||
continue
|
||||
|
||||
# Anonym inline complexType
|
||||
inline_ct = el.find(self._xs("complexType"))
|
||||
if inline_ct is not None:
|
||||
sc = inline_ct.find(self._xs("simpleContent"))
|
||||
if sc is not None:
|
||||
# simpleContent – elementnavnet som prefix
|
||||
for felt in self._udpak_simple_content(sc):
|
||||
if felt.navn == "#text":
|
||||
felt.navn = fuldt_navn # ← brug element-navnet
|
||||
felt.er_tekst = False
|
||||
else:
|
||||
felt.navn = f"{fuldt_navn}_{felt.navn}"
|
||||
felt.er_tekst = False
|
||||
felter.append(felt)
|
||||
else:
|
||||
felter.extend(self._udpak_sequence_eller_choice(inline_ct, prefix=fuldt_navn))
|
||||
felter.extend(self._udpak_attributter(inline_ct))
|
||||
continue
|
||||
|
||||
# Simpelt xs:-element
|
||||
felter.append(XsdFelt(
|
||||
navn=fuldt_navn,
|
||||
xsd_type=el_type or "xsd:string",
|
||||
påkrævet=påkrævet,
|
||||
er_liste=er_liste,
|
||||
er_choice=er_choice
|
||||
))
|
||||
|
||||
# Nested containers
|
||||
for nested_seq in container.findall(self._xs("sequence")):
|
||||
felter.extend(self._udpak_container(nested_seq, prefix, er_choice))
|
||||
for nested_choice in container.findall(self._xs("choice")):
|
||||
felter.extend(self._udpak_container(nested_choice, prefix, er_choice=True))
|
||||
|
||||
return felter
|
||||
|
||||
def _find_liste_elementer(self, min_felter: int = 1) -> None:
|
||||
rod_el = self.root.find(self._xs("element"))
|
||||
if rod_el is None:
|
||||
return
|
||||
rod_navn = rod_el.get("name", "rod")
|
||||
rod_type = rod_el.get("type", "")
|
||||
|
||||
# Registrer rod-elementet selv som udtræk-punkt
|
||||
rod_inline_ct = rod_el.find(self._xs("complexType"))
|
||||
|
||||
if rod_type:
|
||||
inline_felter = []
|
||||
elif rod_inline_ct is not None:
|
||||
inline_felter = self._udpak_felter(rod_inline_ct)
|
||||
else:
|
||||
inline_felter = []
|
||||
|
||||
self.liste_elementer.append(XsdListeElement(
|
||||
element_navn=rod_navn,
|
||||
type_navn=rod_type,
|
||||
rod_sti=rod_navn,
|
||||
overliggende=[],
|
||||
felter=inline_felter,
|
||||
er_simpel=False,
|
||||
join_i_skabelon=""
|
||||
))
|
||||
|
||||
# Traverser resten af XSD-træet
|
||||
self._traverser(
|
||||
node=rod_el,
|
||||
sti=[rod_navn],
|
||||
overliggende=[],
|
||||
type_navn=rod_type,
|
||||
min_felter=min_felter
|
||||
)
|
||||
|
||||
def _traverser(self, node, sti, overliggende, type_navn="", min_felter=1) -> None:
|
||||
ct_node = self._hent_ct_node(node, type_navn)
|
||||
if ct_node is None:
|
||||
return
|
||||
for tag in [self._xs("sequence"), self._xs("choice")]:
|
||||
for container in ct_node.findall(tag):
|
||||
self._traverser_container(container, sti, overliggende, min_felter)
|
||||
|
||||
def _hent_ct_node(self, node, type_navn: str):
|
||||
if type_navn:
|
||||
navn_rent = _strip_prefix(type_navn)
|
||||
for ct in self.root.findall(self._xs("complexType")):
|
||||
if ct.get("name") == navn_rent:
|
||||
return ct
|
||||
return None
|
||||
return node.find(self._xs("complexType"))
|
||||
|
||||
def _traverser_container(self, container, sti, overliggende, min_felter) -> None:
|
||||
for el in container.findall(self._xs("element")):
|
||||
el_navn = el.get("name")
|
||||
el_type = el.get("type", "")
|
||||
max_occ = el.get("maxOccurs", "1")
|
||||
if not el_navn:
|
||||
continue
|
||||
ny_sti = sti + [el_navn]
|
||||
|
||||
if _er_liste(max_occ):
|
||||
rod_sti = ".".join(ny_sti)
|
||||
inline_felter = []
|
||||
inline_ct = el.find(self._xs("complexType"))
|
||||
if inline_ct is not None and not el_type:
|
||||
inline_felter = self._udpak_felter(inline_ct)
|
||||
|
||||
antal = self._tæl_felter(el_type, inline_felter)
|
||||
er_simpel = antal < min_felter
|
||||
join_i = self._find_join_skabelon(overliggende) if er_simpel else ""
|
||||
|
||||
self.liste_elementer.append(XsdListeElement(
|
||||
element_navn=el_navn,
|
||||
type_navn=el_type,
|
||||
rod_sti=rod_sti,
|
||||
overliggende=list(overliggende),
|
||||
felter=inline_felter,
|
||||
er_simpel=er_simpel,
|
||||
join_i_skabelon=join_i
|
||||
))
|
||||
self._traverser(el, ny_sti, overliggende + [el_navn], el_type, min_felter)
|
||||
else:
|
||||
inline_ct = el.find(self._xs("complexType"))
|
||||
if el_type and not _er_xs_type(el_type):
|
||||
self._traverser(el, ny_sti, overliggende, el_type, min_felter)
|
||||
elif inline_ct is not None:
|
||||
self._traverser(el, ny_sti, overliggende, "", min_felter)
|
||||
|
||||
for tag in [self._xs("sequence"), self._xs("choice")]:
|
||||
for nested in container.findall(tag):
|
||||
self._traverser_container(nested, sti, overliggende, min_felter)
|
||||
|
||||
def _tæl_felter(self, type_navn: str, inline_felter: list) -> int:
|
||||
if inline_felter:
|
||||
return len(inline_felter)
|
||||
if not type_navn or _er_xs_type(type_navn):
|
||||
return 1
|
||||
kt = self.komplekse_typer.get(_strip_prefix(type_navn))
|
||||
if kt:
|
||||
if kt.er_ekstern:
|
||||
navn = _strip_prefix(type_navn)
|
||||
if any(s in navn for s in ("String", "Code", "Enum")) and \
|
||||
not any(s in navn for s in ("Party", "Person", "Organisation", "Address")):
|
||||
return 1
|
||||
return 99
|
||||
return len(kt.felter)
|
||||
return 1
|
||||
|
||||
def _find_join_skabelon(self, overliggende: list[str]) -> str:
|
||||
if not overliggende:
|
||||
return ""
|
||||
nærmeste = overliggende[-1]
|
||||
for el in self.liste_elementer:
|
||||
if el.element_navn == nærmeste and el.type_navn:
|
||||
return _type_til_skabelon_navn(el.type_navn)
|
||||
return nærmeste.lower()
|
||||
|
||||
def _registrer_choice_elementer(self) -> None:
|
||||
"""Finder alle choice-elementer i liste-elementerne."""
|
||||
from schema_dataklasser import XsdChoiceElement
|
||||
|
||||
for liste_el in self.liste_elementer:
|
||||
type_navn_rent = _strip_prefix(liste_el.type_navn) if liste_el.type_navn else ""
|
||||
kt = self.komplekse_typer.get(type_navn_rent)
|
||||
if not kt or not kt.felter:
|
||||
continue
|
||||
|
||||
# Find alle choice-grupper i skabelonen
|
||||
choice_refs = [f for f in kt.felter
|
||||
if isinstance(f, XsdSkabelonRef) and f.er_choice]
|
||||
if not choice_refs:
|
||||
continue
|
||||
|
||||
# Grupper efter element_navn's rod-del (fx 'UserID')
|
||||
grupper: dict[str, list] = {}
|
||||
for ref in choice_refs:
|
||||
rod = ref.element_navn.split(".")[0] if "." in ref.element_navn else ref.element_navn
|
||||
grupper.setdefault(rod, []).append(ref)
|
||||
|
||||
for rod_navn, refs in grupper.items():
|
||||
alternativer = []
|
||||
for ref in refs:
|
||||
alt_navn = ref.element_navn.split(".")[-1] if "." in ref.element_navn else ref.element_navn
|
||||
alt_sti = f"{liste_el.rod_sti}.{ref.element_navn}"
|
||||
alternativer.append((alt_navn, ref.skabelon_navn, alt_sti))
|
||||
|
||||
self.choice_elementer.append(XsdChoiceElement(
|
||||
element_navn=rod_navn,
|
||||
rod_sti=liste_el.rod_sti,
|
||||
overliggende=liste_el.overliggende + [liste_el.element_navn]
|
||||
if liste_el.overliggende is not None else [liste_el.element_navn],
|
||||
alternativer=alternativer
|
||||
))
|
||||
# ============================================================
|
||||
# CLI og main
|
||||
# ============================================================
|
||||
|
||||
def læs_config_fil(config_sti: str) -> dict:
|
||||
import yaml
|
||||
if not os.path.exists(config_sti):
|
||||
raise FileNotFoundError(f"Config-filen '{config_sti}' findes ikke.")
|
||||
with open(config_sti, "r", encoding="utf-8") as f:
|
||||
return yaml.safe_load(f) or {}
|
||||
|
||||
|
||||
def byg_argument_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Genererer YAML-skelet fra XSD til udpak_semistruktur."
|
||||
)
|
||||
parser.add_argument("--config", help="Sti til xsd_til_yaml_config.yaml")
|
||||
parser.add_argument("--xsd", help="Sti til XSD-filen")
|
||||
parser.add_argument("--output", help="Output-mappe til YAML-filer")
|
||||
parser.add_argument("--prefix", help="Prefix til udtræk-filnavne (standard: udtræk)")
|
||||
parser.add_argument("--min_felter", type=int,
|
||||
help="Elementer med færre felter foldes ind som join (standard: 1)")
|
||||
parser.add_argument("--db_schema", help="Database schema (standard: dbo)")
|
||||
parser.add_argument("--tabel_prefix",help="Prefix til tabelnavne")
|
||||
parser.add_argument("--output_type", help="Output-type: fil, tabel eller begge (standard: begge)")
|
||||
parser.add_argument("--historik", help="Historik-type: t1 eller t2 (standard: t1)")
|
||||
return parser
|
||||
|
||||
|
||||
def main():
|
||||
parser = byg_argument_parser()
|
||||
args = parser.parse_args()
|
||||
|
||||
cfg = {}
|
||||
if args.config:
|
||||
cfg = læs_config_fil(args.config)
|
||||
|
||||
xsd_sti = args.xsd or cfg.get("xsd")
|
||||
output = args.output or cfg.get("output")
|
||||
prefix = args.prefix or cfg.get("prefix", "udtræk")
|
||||
min_felter = args.min_felter or cfg.get("min_felter", 1)
|
||||
forkortelser = cfg.get("forkortelser", {})
|
||||
db_schema = args.db_schema or cfg.get("db_schema", "dbo")
|
||||
tabel_prefix = args.tabel_prefix or cfg.get("tabel_prefix", "")
|
||||
output_type = args.output_type or cfg.get("output_type", "begge")
|
||||
historik = args.historik or cfg.get("historik", "t1")
|
||||
|
||||
if not xsd_sti:
|
||||
print("FEJL: 'xsd' skal angives – enten i config-fil eller med --xsd")
|
||||
return
|
||||
if not output:
|
||||
print("FEJL: 'output' skal angives – enten i config-fil eller med --output")
|
||||
return
|
||||
if not os.path.exists(xsd_sti):
|
||||
print(f"FEJL: XSD-filen '{xsd_sti}' findes ikke.")
|
||||
return
|
||||
|
||||
os.makedirs(output, exist_ok=True)
|
||||
|
||||
print(f"Parser XSD: {xsd_sti}")
|
||||
if forkortelser:
|
||||
print(f"Forkortelser: {len(forkortelser)} defineret")
|
||||
|
||||
xsd = XsdParser(xsd_sti)
|
||||
xsd.parse(min_felter=min_felter)
|
||||
|
||||
print(f"\nFandt {len(xsd.komplekse_typer)} komplekse typer:")
|
||||
for navn, kt in xsd.komplekse_typer.items():
|
||||
ekstern = " (ekstern)" if kt.er_ekstern else ""
|
||||
print(f" {navn}{ekstern}: {len(kt.felter)} felter")
|
||||
|
||||
print(f"\nFandt {len(xsd.liste_elementer)} liste-elementer:")
|
||||
for el in xsd.liste_elementer:
|
||||
simpel = f" [join → {el.join_i_skabelon}]" if el.er_simpel else ""
|
||||
print(f" {el.element_navn}{simpel} ({el.rod_sti})")
|
||||
if el.overliggende:
|
||||
print(f" Overliggende: {el.overliggende}")
|
||||
|
||||
print(f"\nGenererer YAML-filer i: {output}")
|
||||
|
||||
from yaml_generator import (
|
||||
generer_skabeloner_yaml,
|
||||
generer_nøgler_yaml,
|
||||
generer_udtræk_yaml,
|
||||
generer_choice_udtræk_yaml,
|
||||
generer_hoved_config
|
||||
)
|
||||
|
||||
liste_navne = {el.element_navn for el in xsd.liste_elementer}
|
||||
generer_skabeloner_yaml(
|
||||
xsd.komplekse_typer, output, forkortelser, liste_navne, xsd.liste_elementer
|
||||
)
|
||||
generer_nøgler_yaml(xsd.liste_elementer, output, forkortelser)
|
||||
generer_udtræk_yaml(
|
||||
xsd.liste_elementer, xsd.komplekse_typer,
|
||||
output, prefix, min_felter, forkortelser,
|
||||
db_schema, tabel_prefix, output_type, historik
|
||||
)
|
||||
generer_choice_udtræk_yaml(
|
||||
xsd.choice_elementer,
|
||||
xsd.komplekse_typer,
|
||||
xsd.liste_elementer,
|
||||
output, prefix, forkortelser,
|
||||
db_schema, tabel_prefix,
|
||||
output_type, historik
|
||||
)
|
||||
|
||||
generer_hoved_config(
|
||||
xsd.liste_elementer, xsd_sti, output, prefix, forkortelser
|
||||
)
|
||||
|
||||
print("\nFærdig.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
39
xsd_til_yaml_config.yaml
Normal file
39
xsd_til_yaml_config.yaml
Normal file
@@ -0,0 +1,39 @@
|
||||
# xsd_til_yaml_config.yaml
|
||||
# Konfiguration til xsd_til_yaml.py for CARF DAC8
|
||||
|
||||
xsd: /home/carsten/Hentet/xml-schema-carf/CARFXML_v1.4.xsd
|
||||
output: /home/carsten/Dokumenter/GitClone/udpak_semistruktur/tests/bank_test/output/DAC8/
|
||||
prefix: udtræk
|
||||
min_felter: 2
|
||||
|
||||
# Database schema – bruges til tabel-navne
|
||||
db_schema: dbo
|
||||
|
||||
# Tabel-navn prefix – fx 'DAC8_' giver 'dbo.DAC8_CARFBody_tmp'
|
||||
tabel_prefix: DAC8_
|
||||
|
||||
# Output-type: fil, tabel, eller begge
|
||||
output_type: fil # fil | tabel | begge
|
||||
|
||||
# Historik-type for tabel-output
|
||||
historik: t2 # t1 | t2
|
||||
|
||||
|
||||
forkortelser:
|
||||
CryptoToCryptoIn: CTI
|
||||
CryptoToCryptoOut: CTO
|
||||
CryptoFiatIn: CFI
|
||||
CryptoFiatOut: CFO
|
||||
CryptoTransferIn: CTRI
|
||||
CryptoTransferOut: CTRO
|
||||
TransferWallet: TW
|
||||
RelevantTransactions: RT
|
||||
ControllingPerson: CP
|
||||
NumberofTransactions: NoTrans
|
||||
NumberofUnits: NumUnits
|
||||
ExchangeType: ExchType
|
||||
AltValuation: AltVal
|
||||
Organisation: Org
|
||||
Individual: Ind
|
||||
ResCountryCode: ResCC
|
||||
CryptoAsset: CAs
|
||||
38
xsd_til_yaml_config_bank.yaml
Normal file
38
xsd_til_yaml_config_bank.yaml
Normal file
@@ -0,0 +1,38 @@
|
||||
# xsd_til_yaml_config.yaml
|
||||
# Konfiguration til xsd_til_yaml.py for CARF DAC8
|
||||
|
||||
xsd: /home/carsten/Dokumenter/GitClone/udpak_semistruktur/tests/bank_test/data/bankdata.xsd
|
||||
output: /home/carsten/Dokumenter/GitClone/udpak_semistruktur/tests/bank_test/output/DAC8/
|
||||
prefix: udtræk
|
||||
min_felter: 2
|
||||
|
||||
# Database schema – bruges til tabel-navne
|
||||
db_schema: dbo
|
||||
|
||||
# Tabel-navn prefix – fx 'DAC8_' giver 'dbo.DAC8_CARFBody_tmp'
|
||||
tabel_prefix: bank_
|
||||
|
||||
# Output-type: fil, tabel, eller begge
|
||||
output_type: fil # fil | tabel | begge
|
||||
|
||||
# Historik-type for tabel-output
|
||||
historik: t2 # t1 | t2
|
||||
|
||||
|
||||
#forkortelser:
|
||||
# CryptoToCryptoIn: CTI
|
||||
# CryptoToCryptoOut: CTO
|
||||
# CryptoFiatIn: CFI
|
||||
# CryptoFiatOut: CFO
|
||||
# CryptoTransferIn: CTRI
|
||||
# CryptoTransferOut: CTRO
|
||||
# TransferWallet: TW
|
||||
# RelevantTransactions: RT
|
||||
# ControllingPerson: CP
|
||||
# NumberofTransactions: NoTrans
|
||||
# NumberofUnits: NumUnits
|
||||
# ExchangeType: ExchType
|
||||
# AltValuation: AltVal
|
||||
# Organisation: Org
|
||||
# Individual: Ind
|
||||
# ResCountryCode: ResCC
|
||||
@@ -1,471 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
xsd_til_yaml.py
|
||||
|
||||
Genererer YAML-skelet fra en XSD-fil til brug med udpak_semistruktur.
|
||||
|
||||
Producerer:
|
||||
- skabeloner.yaml – feltskabeloner for alle complexTypes
|
||||
- nøgler.yaml – nøgle-placeholders for overliggende unbounded-niveauer
|
||||
- udtræk_<navn>.yaml – én fil per maxOccurs="unbounded" element
|
||||
|
||||
Kør med:
|
||||
python3 xsd_til_yaml.py --xsd bankdata.xsd --output ./yaml_output/
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
import os
|
||||
import argparse
|
||||
from xml.etree import ElementTree as ET
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Optional
|
||||
|
||||
# XML Schema namespace
|
||||
XS = "http://www.w3.org/2001/XMLSchema"
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Datastrukturer
|
||||
# ============================================================
|
||||
|
||||
@dataclass
|
||||
class XsdFelt:
|
||||
"""Repræsenterer ét felt (attribut eller element) i en complexType."""
|
||||
navn: str
|
||||
er_attribut: bool
|
||||
xsd_type: str = "xs:string"
|
||||
påkrævet: bool = False
|
||||
|
||||
@property
|
||||
def felt_ref(self) -> str:
|
||||
"""Returnerer felt-referencen som den skrives i YAML."""
|
||||
return f"@{self.navn}" if self.er_attribut else self.navn
|
||||
|
||||
|
||||
@dataclass
|
||||
class XsdKompleksType:
|
||||
"""Repræsenterer en complexType fra XSD."""
|
||||
navn: str
|
||||
felter: list[XsdFelt] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class XsdUdboundElement:
|
||||
"""Repræsenterer et maxOccurs=unbounded element – bliver til en output-fil."""
|
||||
element_navn: str # fx "postering"
|
||||
type_navn: str # fx "PosteringType"
|
||||
rod_sti: str # fx "bank.kunder.kunde.konti.konto.posteringer.postering"
|
||||
overliggende: list[str] # navne på overliggende unbounded elementer
|
||||
# fx ["kunde", "konto"]
|
||||
|
||||
|
||||
# ============================================================
|
||||
# XSD Parser
|
||||
# ============================================================
|
||||
|
||||
class XsdParser:
|
||||
"""Parser en XSD-fil og udtrækker complexTypes og unbounded elementer."""
|
||||
|
||||
def __init__(self, xsd_sti: str):
|
||||
self.xsd_sti = xsd_sti
|
||||
self.tree = ET.parse(xsd_sti)
|
||||
self.root = self.tree.getroot()
|
||||
|
||||
# Navngivne complexTypes fra XSD
|
||||
self.komplekse_typer: dict[str, XsdKompleksType] = {}
|
||||
|
||||
# Unbounded elementer med rod-sti og overliggende niveauer
|
||||
self.unbounded_elementer: list[XsdUdboundElement] = []
|
||||
|
||||
def parse(self) -> None:
|
||||
"""Kør den komplette parsing."""
|
||||
self._parse_navngivne_typer()
|
||||
self._find_unbounded_elementer()
|
||||
|
||||
def _xs(self, tag: str) -> str:
|
||||
"""Returnerer fuldt kvalificeret XS-tag."""
|
||||
return f"{{{XS}}}{tag}"
|
||||
|
||||
def _parse_navngivne_typer(self) -> None:
|
||||
"""Find alle navngivne complexTypes på topniveau."""
|
||||
for ct in self.root.findall(self._xs("complexType")):
|
||||
navn = ct.get("name")
|
||||
if not navn:
|
||||
continue
|
||||
felter = self._udpak_felter(ct)
|
||||
self.komplekse_typer[navn] = XsdKompleksType(navn=navn, felter=felter)
|
||||
|
||||
def _udpak_felter(self, ct_node) -> list[XsdFelt]:
|
||||
"""Udpakker alle felter (attributter + simple elementer) fra en complexType."""
|
||||
felter = []
|
||||
|
||||
# Attributter – bliver til @navn i YAML
|
||||
for attr in ct_node.findall(self._xs("attribute")):
|
||||
navn = attr.get("name")
|
||||
xsd_type = attr.get("type", "xs:string")
|
||||
påkrævet = attr.get("use") == "required"
|
||||
felter.append(XsdFelt(
|
||||
navn=navn,
|
||||
er_attribut=True,
|
||||
xsd_type=xsd_type,
|
||||
påkrævet=påkrævet
|
||||
))
|
||||
|
||||
# Sequence-elementer
|
||||
for seq in ct_node.findall(self._xs("sequence")):
|
||||
for el in seq.findall(self._xs("element")):
|
||||
el_type = el.get("type", "xs:string")
|
||||
el_navn = el.get("name")
|
||||
max_occ = el.get("maxOccurs", "1")
|
||||
|
||||
# Spring komplekse typer og unbounded over – de håndteres separat
|
||||
if max_occ == "unbounded":
|
||||
continue
|
||||
if el_type and not el_type.startswith("xs:"):
|
||||
continue
|
||||
|
||||
felter.append(XsdFelt(
|
||||
navn=el_navn,
|
||||
er_attribut=False,
|
||||
xsd_type=el_type or "xs:string",
|
||||
påkrævet=True
|
||||
))
|
||||
|
||||
return felter
|
||||
|
||||
def _find_unbounded_elementer(self) -> None:
|
||||
"""
|
||||
Gennemgår XSD rekursivt og finder alle maxOccurs=unbounded elementer.
|
||||
Bygger rod-stier og registrerer overliggende unbounded niveauer.
|
||||
"""
|
||||
# Find rod-elementet
|
||||
rod_el = self.root.find(self._xs("element"))
|
||||
if rod_el is None:
|
||||
return
|
||||
|
||||
rod_navn = rod_el.get("name", "rod")
|
||||
rod_type = rod_el.get("type")
|
||||
|
||||
self._traverse(
|
||||
node=rod_el,
|
||||
sti=[rod_navn],
|
||||
overliggende_unbounded=[],
|
||||
rod_type=rod_type
|
||||
)
|
||||
|
||||
def _hent_type_node(self, type_navn: str):
|
||||
"""Henter en navngiven complexType-node fra XSD."""
|
||||
for ct in self.root.findall(self._xs("complexType")):
|
||||
if ct.get("name") == type_navn:
|
||||
return ct
|
||||
return None
|
||||
|
||||
def _traverse(
|
||||
self,
|
||||
node,
|
||||
sti: list[str],
|
||||
overliggende_unbounded: list[str],
|
||||
rod_type: Optional[str] = None
|
||||
) -> None:
|
||||
"""
|
||||
Rekursiv gennemgang af XSD-strukturen.
|
||||
Finder unbounded elementer og bygger rod-stier.
|
||||
"""
|
||||
# Find complexType-noden at arbejde med
|
||||
if rod_type:
|
||||
ct_node = self._hent_type_node(rod_type)
|
||||
else:
|
||||
ct_node = node.find(self._xs("complexType"))
|
||||
|
||||
if ct_node is None:
|
||||
return
|
||||
|
||||
# Gennemgå sequence-elementer
|
||||
for seq in ct_node.findall(self._xs("sequence")):
|
||||
for el in seq.findall(self._xs("element")):
|
||||
el_navn = el.get("name")
|
||||
el_type = el.get("type")
|
||||
max_occ = el.get("maxOccurs", "1")
|
||||
|
||||
ny_sti = sti + [el_navn]
|
||||
|
||||
if max_occ == "unbounded":
|
||||
# Dette element bliver en output-fil
|
||||
rod_sti = ".".join(ny_sti)
|
||||
self.unbounded_elementer.append(XsdUdboundElement(
|
||||
element_navn=el_navn,
|
||||
type_navn=el_type or "",
|
||||
rod_sti=rod_sti,
|
||||
overliggende=list(overliggende_unbounded)
|
||||
))
|
||||
# Fortsæt rekursivt med dette som nyt overliggende niveau
|
||||
self._traverse(
|
||||
node=el,
|
||||
sti=ny_sti,
|
||||
overliggende_unbounded=overliggende_unbounded + [el_navn],
|
||||
rod_type=el_type
|
||||
)
|
||||
else:
|
||||
# Ikke unbounded – fortsæt ned hvis det er en kompleks type
|
||||
if el_type and not el_type.startswith("xs:"):
|
||||
self._traverse(
|
||||
node=el,
|
||||
sti=ny_sti,
|
||||
overliggende_unbounded=overliggende_unbounded,
|
||||
rod_type=el_type
|
||||
)
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Hjælpefunktion: XSD type → YAML type
|
||||
# ============================================================
|
||||
|
||||
def xsd_type_til_yaml(xsd_type: str) -> dict:
|
||||
"""Konverterer en XSD-type til YAML kolonne-attributter."""
|
||||
mapping = {
|
||||
"xs:string": {},
|
||||
"xs:decimal": {"type": "decimal", "decimaler": 2},
|
||||
"xs:integer": {"type": "integer"},
|
||||
"xs:int": {"type": "integer"},
|
||||
"xs:boolean": {"type": "boolean"},
|
||||
"xs:date": {"type": "date", "dato_ind": "%Y-%m-%d", "dato_ud": "%d-%m-%Y"},
|
||||
"xs:dateTime": {"type": "date", "dato_ind": "%Y-%m-%dT%H:%M:%S", "dato_ud": "SYBASE"},
|
||||
}
|
||||
return mapping.get(xsd_type, {})
|
||||
|
||||
|
||||
# ============================================================
|
||||
# CLI
|
||||
# ============================================================
|
||||
|
||||
def byg_argument_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Genererer YAML-skelet fra XSD til udpak_semistruktur."
|
||||
)
|
||||
parser.add_argument("--xsd", required=True, help="Sti til XSD-filen")
|
||||
parser.add_argument("--output", required=True, help="Output-mappe til YAML-filer")
|
||||
parser.add_argument("--prefix", default="udtræk",
|
||||
help="Prefix til udtræk-filnavne (standard: udtræk)")
|
||||
return parser
|
||||
|
||||
|
||||
# ============================================================
|
||||
# YAML Generatorer
|
||||
# ============================================================
|
||||
|
||||
def _yaml_type_linjer(xsd_type: str, indryk: str) -> list[str]:
|
||||
"""Returnerer YAML-linjer for typekonvertering baseret på XSD-type."""
|
||||
attrs = xsd_type_til_yaml(xsd_type)
|
||||
linjer = []
|
||||
for k, v in attrs.items():
|
||||
if isinstance(v, str):
|
||||
linjer.append(f'{indryk}{k}: "{v}"')
|
||||
else:
|
||||
linjer.append(f'{indryk}{k}: {v}')
|
||||
return linjer
|
||||
|
||||
|
||||
def generer_skabeloner_yaml(xsd: XsdParser, output_mappe: str) -> None:
|
||||
"""Genererer skabeloner.yaml med feltskabeloner for alle relevante complexTypes."""
|
||||
linjer = []
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append("# skabeloner.yaml")
|
||||
linjer.append("#")
|
||||
linjer.append("# Auto-genereret feltskabeloner fra XSD.")
|
||||
linjer.append("# Relative feltnavne – brug prefix_felt naar skabelonen anvendes.")
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append("")
|
||||
linjer.append("kolonne_skabeloner:")
|
||||
linjer.append("")
|
||||
|
||||
relevante = {navn: kt for navn, kt in xsd.komplekse_typer.items() if kt.felter}
|
||||
|
||||
for type_navn, kt in relevante.items():
|
||||
skabelon_navn = type_navn.replace("Type", "").lower()
|
||||
linjer.append(f" # Felter fra {type_navn}")
|
||||
linjer.append(f" {skabelon_navn}:")
|
||||
|
||||
for felt in kt.felter:
|
||||
linjer.append(f" - navn: {felt.navn}")
|
||||
linjer.append(f' felt: "{felt.felt_ref}"')
|
||||
for type_linje in _yaml_type_linjer(felt.xsd_type, " "):
|
||||
linjer.append(type_linje)
|
||||
if felt.påkrævet:
|
||||
linjer.append(f" påkrævet: true")
|
||||
linjer.append("")
|
||||
|
||||
sti = os.path.join(output_mappe, "skabeloner.yaml")
|
||||
with open(sti, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(linjer))
|
||||
print(f" Skrev: {sti}")
|
||||
|
||||
|
||||
def generer_nøgler_yaml(xsd: XsdParser, output_mappe: str) -> None:
|
||||
"""Genererer nøgler.yaml med placeholders for forretningsnøgler."""
|
||||
overliggende_niveauer = set()
|
||||
for el in xsd.unbounded_elementer:
|
||||
for ov in el.overliggende:
|
||||
overliggende_niveauer.add(ov)
|
||||
|
||||
if not overliggende_niveauer:
|
||||
print(" Ingen overliggende niveauer – nøgler.yaml ikke genereret.")
|
||||
return
|
||||
|
||||
linjer = []
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append("# nøgler.yaml")
|
||||
linjer.append("#")
|
||||
linjer.append("# Nøgle-skabeloner der binder output-filer til overliggende niveauer.")
|
||||
linjer.append("# UDFYLD: Erstat __FELT__ med den korrekte forretningsnøgle.")
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append("")
|
||||
linjer.append("kolonne_skabeloner:")
|
||||
linjer.append("")
|
||||
|
||||
skrevne = set()
|
||||
for el in xsd.unbounded_elementer:
|
||||
if not el.overliggende:
|
||||
continue
|
||||
rod_dele = el.rod_sti.split(".")
|
||||
for ov_navn in el.overliggende:
|
||||
skabelon_navn = f"{ov_navn}_nøgle"
|
||||
if skabelon_navn in skrevne:
|
||||
continue
|
||||
skrevne.add(skabelon_navn)
|
||||
try:
|
||||
idx = rod_dele.index(ov_navn)
|
||||
ov_rod_sti = ".".join(rod_dele[:idx + 1])
|
||||
except ValueError:
|
||||
ov_rod_sti = f"__ROD_STI_TIL_{ov_navn.upper()}__"
|
||||
|
||||
linjer.append(f" # Nøgle fra {ov_navn}-niveau")
|
||||
linjer.append(f" # TODO: Angiv den korrekte forretningsnøgle for {ov_navn}")
|
||||
linjer.append(f" {skabelon_navn}:")
|
||||
linjer.append(f" - navn: __FELT__")
|
||||
linjer.append(" felt: __FELT__ # fx '@id' eller 'nr'")
|
||||
linjer.append(f" rod: {ov_rod_sti}")
|
||||
linjer.append("")
|
||||
|
||||
sti = os.path.join(output_mappe, "nøgler.yaml")
|
||||
with open(sti, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(linjer))
|
||||
print(f" Skrev: {sti}")
|
||||
|
||||
|
||||
def generer_udtræk_yaml(xsd: XsdParser, output_mappe: str, prefix: str) -> None:
|
||||
"""Genererer én udtræk-yaml per unbounded element."""
|
||||
for el in xsd.unbounded_elementer:
|
||||
linjer = []
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append(f"# {prefix}_{el.element_navn}.yaml")
|
||||
linjer.append("#")
|
||||
linjer.append(f"# Auto-genereret udtræk for {el.element_navn}.")
|
||||
if el.overliggende:
|
||||
linjer.append(f"# Udfyld nøgle-skabeloner i nøgler.yaml inden brug.")
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append("")
|
||||
linjer.append("output_filer:")
|
||||
linjer.append("")
|
||||
linjer.append(f" - rod: {el.rod_sti}")
|
||||
linjer.append(f" kolonner:")
|
||||
|
||||
if el.overliggende:
|
||||
linjer.append(f" # Nøgler fra overliggende niveauer – udfyld i nøgler.yaml")
|
||||
for ov_navn in el.overliggende:
|
||||
linjer.append(f" - skabelon: {ov_navn}_nøgle")
|
||||
linjer.append("")
|
||||
|
||||
if el.type_navn and el.type_navn in xsd.komplekse_typer:
|
||||
kt = xsd.komplekse_typer[el.type_navn]
|
||||
if kt.felter:
|
||||
skabelon_navn = el.type_navn.replace("Type", "").lower()
|
||||
linjer.append(f" # Felter fra {el.element_navn} – se skabeloner.yaml")
|
||||
linjer.append(f" - skabelon: {skabelon_navn}")
|
||||
|
||||
linjer.append("")
|
||||
linjer.append(f" outputs:")
|
||||
linjer.append(f" - fil_navn: \"{el.element_navn}_{{yyyy}}{{mm}}{{dd}}.txt\"")
|
||||
linjer.append(f" overskrifter: true")
|
||||
linjer.append("")
|
||||
|
||||
fil_navn = f"{prefix}_{el.element_navn}.yaml"
|
||||
sti = os.path.join(output_mappe, fil_navn)
|
||||
with open(sti, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(linjer))
|
||||
print(f" Skrev: {sti}")
|
||||
|
||||
|
||||
def generer_hoved_config(xsd: XsdParser, output_mappe: str, prefix: str) -> None:
|
||||
"""Genererer en hoved-config der inkluderer alle genererede filer."""
|
||||
xsd_basis = os.path.splitext(os.path.basename(xsd.xsd_sti))[0]
|
||||
linjer = []
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append(f"# config_{xsd_basis}.yaml")
|
||||
linjer.append("#")
|
||||
linjer.append("# Auto-genereret hoved-config. Tilpas config-sektionen.")
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append("")
|
||||
linjer.append("include:")
|
||||
linjer.append(" - skabeloner.yaml")
|
||||
linjer.append(" - nøgler.yaml")
|
||||
for el in xsd.unbounded_elementer:
|
||||
linjer.append(f" - {prefix}_{el.element_navn}.yaml")
|
||||
linjer.append("")
|
||||
linjer.append("config:")
|
||||
linjer.append(f" input_fil: {xsd_basis}.xml")
|
||||
linjer.append(" output_path: __OUTPUT_PATH__")
|
||||
linjer.append(" logfil: log_{yyyy}{mm}{dd}.txt")
|
||||
linjer.append(" log_niveau: info")
|
||||
linjer.append(" log_output: begge")
|
||||
linjer.append(" encoding: utf-8")
|
||||
linjer.append(" separator: \"\\t\"")
|
||||
linjer.append(" skrivetilstand: w")
|
||||
linjer.append(" dato_ind: \"%Y-%m-%d\"")
|
||||
linjer.append(" dato_ud: \"%d-%m-%Y\"")
|
||||
|
||||
fil_navn = f"config_{xsd_basis}.yaml"
|
||||
sti = os.path.join(output_mappe, fil_navn)
|
||||
with open(sti, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(linjer))
|
||||
print(f" Skrev: {sti}")
|
||||
|
||||
|
||||
def main():
|
||||
parser = byg_argument_parser()
|
||||
args = parser.parse_args()
|
||||
|
||||
if not os.path.exists(args.xsd):
|
||||
print(f"FEJL: XSD-filen '{args.xsd}' findes ikke.")
|
||||
return
|
||||
|
||||
os.makedirs(args.output, exist_ok=True)
|
||||
|
||||
print(f"Parser XSD: {args.xsd}")
|
||||
xsd = XsdParser(args.xsd)
|
||||
xsd.parse()
|
||||
|
||||
print(f"\nFandt {len(xsd.komplekse_typer)} komplekse typer:")
|
||||
for navn in xsd.komplekse_typer:
|
||||
kt = xsd.komplekse_typer[navn]
|
||||
print(f" {navn}: {len(kt.felter)} felter")
|
||||
|
||||
print(f"\nFandt {len(xsd.unbounded_elementer)} unbounded elementer:")
|
||||
for el in xsd.unbounded_elementer:
|
||||
print(f" {el.element_navn} ({el.rod_sti})")
|
||||
if el.overliggende:
|
||||
print(f" Overliggende: {el.overliggende}")
|
||||
|
||||
print(f"\nGenererer YAML-filer i: {args.output}")
|
||||
generer_skabeloner_yaml(xsd, args.output)
|
||||
generer_nøgler_yaml(xsd, args.output)
|
||||
generer_udtræk_yaml(xsd, args.output, args.prefix)
|
||||
generer_hoved_config(xsd, args.output, args.prefix)
|
||||
|
||||
print("\nFærdig! Husk at:")
|
||||
print(" 1. Udfyld nøgler.yaml med de korrekte forretningsnøgler")
|
||||
print(" 2. Ret output_path i hoved-config")
|
||||
print(" 3. Tjek at rod-stier passer til din XML-struktur")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
660
yaml_generator.py
Normal file
660
yaml_generator.py
Normal file
@@ -0,0 +1,660 @@
|
||||
"""
|
||||
yaml_generator.py
|
||||
|
||||
Fælles YAML-generering til brug med udpak_semistruktur.
|
||||
Importeres af xsd_til_yaml.py, jsonschema_til_yaml.py og elastic_til_yaml.py.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
import os
|
||||
from schema_dataklasser import XsdFelt, XsdSkabelonRef, XsdKompleksType, XsdListeElement
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Hjælpefunktioner
|
||||
# ============================================================
|
||||
|
||||
def _find_fælles_prefix(liste_elementer: list) -> int:
|
||||
if not liste_elementer:
|
||||
return 0
|
||||
# Udelad rod-elementet (første element med korteste sti)
|
||||
alle_stier = [el.rod_sti.split(".") for el in liste_elementer if len(el.rod_sti.split(".")) > 1]
|
||||
if not alle_stier:
|
||||
return 0
|
||||
korteste = min(len(s) for s in alle_stier)
|
||||
fælles = 0
|
||||
for i in range(korteste):
|
||||
if len(set(s[i] for s in alle_stier)) == 1:
|
||||
fælles += 1
|
||||
else:
|
||||
break
|
||||
return min(fælles, korteste - 1)
|
||||
|
||||
|
||||
def _byg_choice_grupper(liste_elementer: list) -> dict[str, dict]:
|
||||
"""
|
||||
Grupperer liste-elementer der deler samme rod op til choice-niveauet.
|
||||
Returnerer dict: rod_sti → {
|
||||
'fælles_rod': str,
|
||||
'valg': [str], # fx ['Entity', 'Individual']
|
||||
'element_navn': str, # fx 'Address'
|
||||
'samlet_rod': str # fx 'CARF_OECD...UserID.[Entity, Individual].Address'
|
||||
}
|
||||
"""
|
||||
from collections import defaultdict
|
||||
grupper: dict[str, list] = defaultdict(list)
|
||||
|
||||
for el in liste_elementer:
|
||||
dele = el.rod_sti.split(".")
|
||||
# Nøglen er: alt undtagen næstsidste led (choice-leddet)
|
||||
if len(dele) >= 2:
|
||||
nøgle = ".".join(dele[:-2]) + "." + dele[-1] # fjern choice-leddet
|
||||
grupper[nøgle].append(el)
|
||||
|
||||
resultat = {}
|
||||
for nøgle, elementer in grupper.items():
|
||||
if len(elementer) > 1:
|
||||
valg = [el.rod_sti.split(".")[-2] for el in elementer]
|
||||
fælles_rod = ".".join(elementer[0].rod_sti.split(".")[:-2])
|
||||
element_navn = elementer[0].rod_sti.split(".")[-1]
|
||||
samlet = f"{fælles_rod}.[{', '.join(valg)}].{element_navn}"
|
||||
for el in elementer:
|
||||
resultat[el.rod_sti] = {
|
||||
'fælles_rod': fælles_rod,
|
||||
'valg': valg,
|
||||
'element_navn': element_navn,
|
||||
'samlet_rod': samlet
|
||||
}
|
||||
return resultat
|
||||
|
||||
|
||||
def _anvend_forkortelser(navn: str, forkortelser: dict[str, str]) -> str:
|
||||
"""Anvender forkortelser case-insensitivt."""
|
||||
resultat = navn
|
||||
for langt, kort in forkortelser.items():
|
||||
resultat = resultat.replace(langt.lower(), kort.lower())
|
||||
resultat = resultat.replace(langt, kort)
|
||||
return resultat
|
||||
|
||||
|
||||
def _xsd_type_til_yaml(xsd_type: str) -> dict:
|
||||
"""Konverterer XSD-type til YAML-attributter."""
|
||||
t = xsd_type.lower()
|
||||
if "decimal" in t:
|
||||
return {"type": "decimal", "decimaler": 2}
|
||||
if "integer" in t or ("int" in t and "string" not in t):
|
||||
return {"type": "integer"}
|
||||
if "boolean" in t:
|
||||
return {"type": "boolean"}
|
||||
if "datetime" in t:
|
||||
return {"type": "date",
|
||||
"dato_ind": '"%Y-%m-%dT%H:%M:%S"',
|
||||
"dato_ud": '"SYBASE"'}
|
||||
if "date" in t:
|
||||
return {"type": "date",
|
||||
"dato_ind": '"%Y-%m-%d"',
|
||||
"dato_ud": '"%d-%m-%Y"'}
|
||||
return {}
|
||||
|
||||
|
||||
def _skriv_felt(felt: XsdFelt, indryk: str, forkortelser: dict,
|
||||
liste_navne: set = set(),
|
||||
fil_navne_index: dict = {}) -> list[str]:
|
||||
# Tjek på felt_ref – ikke navn – da navn kan være forkortet
|
||||
rod_del = felt.felt_ref.split(".")[0] if "." in felt.felt_ref else ""
|
||||
er_i_liste_fil = rod_del and rod_del in liste_navne
|
||||
|
||||
linjer = []
|
||||
navn = _anvend_forkortelser(felt.yaml_navn, forkortelser)
|
||||
|
||||
# if rod_del:
|
||||
# print(f" DEBUG: rod_del='{rod_del}' er_i_liste_fil={er_i_liste_fil} liste_navne_sample={list(liste_navne)[:5]}")
|
||||
|
||||
if er_i_liste_fil:
|
||||
fil_navn = fil_navne_index.get(rod_del, f"udtræk_{rod_del.lower()}.yaml")
|
||||
linjer.append(f'')
|
||||
linjer.append(f'{indryk}# OBS: {rod_del} har sin egen udtræk-fil: {fil_navn}')
|
||||
linjer.append(f'{indryk}# - navn: {navn}')
|
||||
linjer.append(f'{indryk}# felt: "{felt.felt_ref}"')
|
||||
else:
|
||||
linjer.append(f'')
|
||||
navn_str = f'"{navn}"' if navn.startswith("#") else navn
|
||||
linjer.append(f'{indryk}- navn: {navn_str}')
|
||||
linjer.append(f'{indryk} felt: "{felt.felt_ref}"')
|
||||
for k, v in _xsd_type_til_yaml(felt.xsd_type).items():
|
||||
linjer.append(f'{indryk} {k}: {v}')
|
||||
if felt.påkrævet:
|
||||
linjer.append(f'{indryk} påkrævet: true')
|
||||
if felt.er_liste:
|
||||
linjer.append(f'{indryk} join: true')
|
||||
linjer.append(f'{indryk} join_separator: ", "')
|
||||
linjer.append(f'{indryk} # OBS: Kan forekomme flere gange (maxOccurs > 1)')
|
||||
|
||||
return linjer
|
||||
|
||||
def _skriv_skabelon_ref(ref: XsdSkabelonRef, indryk: str, forkortelser: dict,
|
||||
liste_navne: set = set(), tomme_skabeloner: set = set(),
|
||||
fil_navne_index: dict = {}) -> list[str]:
|
||||
linjer = []
|
||||
prefix_navn = _anvend_forkortelser(ref.prefix_navn, forkortelser)
|
||||
|
||||
# Tjek om prefix_felt matcher et liste-element – med eller uden punktum
|
||||
rod_del = ref.prefix_felt.split(".")[0] if "." in ref.prefix_felt else ref.prefix_felt
|
||||
er_i_liste_fil = rod_del and rod_del in liste_navne
|
||||
|
||||
# Tjek om skabelonen er tom (alle felter udkommenteret)
|
||||
er_tom_skabelon = ref.skabelon_navn in tomme_skabeloner
|
||||
|
||||
if er_i_liste_fil or er_tom_skabelon:
|
||||
linjer.append(f'')
|
||||
if er_i_liste_fil:
|
||||
fil_navn = fil_navne_index.get(rod_del, f"udtræk_{rod_del.lower()}.yaml")
|
||||
linjer.append(f'{indryk}# OBS: {rod_del} har sin egen udtræk-fil: {fil_navn}')
|
||||
else:
|
||||
linjer.append(f'{indryk}# OBS: {ref.skabelon_navn} skabelonen er tom – alle felter har egne udtræk-filer')
|
||||
linjer.append(f'{indryk}# - skabelon: {ref.skabelon_navn}')
|
||||
linjer.append(f'{indryk}# prefix_felt: {ref.prefix_felt}')
|
||||
linjer.append(f'{indryk}# prefix_navn: "{prefix_navn}"')
|
||||
return linjer
|
||||
|
||||
if ref.er_choice:
|
||||
linjer.append(f'')
|
||||
linjer.append(f'{indryk}# Vælg ét alternativ (choice):')
|
||||
else:
|
||||
linjer.append(f'')
|
||||
linjer.append(f'{indryk}- skabelon: {ref.skabelon_navn}')
|
||||
linjer.append(f'{indryk} prefix_felt: {ref.prefix_felt}')
|
||||
linjer.append(f'{indryk} prefix_navn: "{prefix_navn}"')
|
||||
if ref.er_choice:
|
||||
linjer.append(f'{indryk} hvis_findes: {ref.prefix_felt}')
|
||||
if ref.er_liste:
|
||||
element_basis = ref.element_navn.lower().split(".")[-1]
|
||||
fil_navn_hint = fil_navne_index.get(
|
||||
ref.element_navn.split(".")[-1],
|
||||
f"udtræk_{element_basis}.yaml"
|
||||
)
|
||||
linjer.append(f'{indryk} # OBS: Kan forekomme flere gange (maxOccurs > 1).')
|
||||
linjer.append(f'{indryk} # Overvej om det skal udtrækkes i en separat fil.')
|
||||
linjer.append(f'{indryk} # Se evt. {fil_navn_hint}')
|
||||
return linjer
|
||||
|
||||
def _har_aktive_felter(kt: XsdKompleksType, liste_navne: set[str]) -> bool:
|
||||
"""Returnerer True hvis skabelonen har mindst ét aktivt felt
|
||||
der ikke er udkommenteret fordi det har sin egen udtræk-fil."""
|
||||
return any(
|
||||
not (isinstance(f, XsdSkabelonRef) and
|
||||
(f.prefix_felt.split(".")[0] if "." in f.prefix_felt else f.prefix_felt) in liste_navne)
|
||||
and not (isinstance(f, XsdFelt) and
|
||||
(f.felt_ref.split(".")[0] if "." in f.felt_ref else "") in liste_navne)
|
||||
for f in kt.felter
|
||||
)
|
||||
|
||||
# ============================================================
|
||||
# Generator: skabeloner.yaml
|
||||
# ============================================================
|
||||
|
||||
def generer_skabeloner_yaml(
|
||||
komplekse_typer: dict[str, XsdKompleksType],
|
||||
output_mappe: str,
|
||||
forkortelser: dict[str, str] = {},
|
||||
liste_navne: set[str] = set(),
|
||||
liste_elementer: list = [] # ← tilføj
|
||||
) -> None:
|
||||
|
||||
fælles_prefix_længde = _find_fælles_prefix(liste_elementer)
|
||||
fil_navne_index: dict[str, str] = {}
|
||||
for el in liste_elementer:
|
||||
rod_dele = el.rod_sti.split(".")
|
||||
unikke_dele = rod_dele[fælles_prefix_længde:] if len(rod_dele) > fælles_prefix_længde else rod_dele
|
||||
fil_del = _anvend_forkortelser("_".join(unikke_dele).lower(), forkortelser)
|
||||
# Behold kun første registrering per element-navn
|
||||
if el.element_navn not in fil_navne_index:
|
||||
fil_navne_index[el.element_navn] = f"udtræk_{fil_del}.yaml"
|
||||
|
||||
linjer = []
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append("# skabeloner.yaml")
|
||||
linjer.append("#")
|
||||
linjer.append("# Auto-genereret feltskabeloner fra XSD.")
|
||||
linjer.append("# Brug prefix_felt og prefix_navn når skabelonen inkluderes.")
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append("")
|
||||
linjer.append("kolonne_skabeloner:")
|
||||
linjer.append("")
|
||||
|
||||
# Byg sæt af tomme skabeloner
|
||||
tomme_skabeloner = {
|
||||
kt.navn.replace("_Type", "").replace("Type", "").lower()
|
||||
for kt in komplekse_typer.values()
|
||||
if kt.felter and not kt.er_ekstern and not _har_aktive_felter(kt, liste_navne)
|
||||
}
|
||||
|
||||
relevante = {n: kt for n, kt in komplekse_typer.items()
|
||||
if kt.felter and not kt.er_ekstern and _har_aktive_felter(kt, liste_navne)}
|
||||
|
||||
for type_navn, kt in relevante.items():
|
||||
skabelon_navn = type_navn.replace("_Type", "").replace("Type", "").lower()
|
||||
|
||||
linjer.append(f" # {type_navn}")
|
||||
if not _har_aktive_felter(kt, liste_navne):
|
||||
linjer.append(f" {skabelon_navn}: []")
|
||||
linjer.append("")
|
||||
continue
|
||||
|
||||
linjer.append(f" {skabelon_navn}:")
|
||||
for felt in kt.felter:
|
||||
if isinstance(felt, XsdSkabelonRef):
|
||||
linjer.extend(_skriv_skabelon_ref(
|
||||
felt, indryk=" ", forkortelser=forkortelser,
|
||||
liste_navne=liste_navne, tomme_skabeloner=tomme_skabeloner,
|
||||
fil_navne_index=fil_navne_index
|
||||
))
|
||||
elif isinstance(felt, XsdFelt):
|
||||
linjer.extend(_skriv_felt(
|
||||
felt, indryk=" ", forkortelser=forkortelser,
|
||||
liste_navne=liste_navne, fil_navne_index=fil_navne_index
|
||||
))
|
||||
linjer.append("")
|
||||
|
||||
sti = os.path.join(output_mappe, "skabeloner.yaml")
|
||||
with open(sti, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(linjer))
|
||||
print(f" Skrev: {sti}")
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Generator: choice udtræk-filer
|
||||
# ============================================================
|
||||
|
||||
def generer_choice_udtræk_yaml(
|
||||
choice_elementer: list,
|
||||
komplekse_typer: dict,
|
||||
liste_elementer: list,
|
||||
output_mappe: str,
|
||||
prefix: str = "udtræk",
|
||||
forkortelser: dict = {},
|
||||
db_schema: str = "dbo",
|
||||
tabel_prefix: str = "",
|
||||
output_type: str = "begge",
|
||||
historik: str = "t2"
|
||||
) -> None:
|
||||
from schema_dataklasser import XsdChoiceElement, XsdSkabelonRef, XsdFelt
|
||||
|
||||
liste_navne = {el.element_navn for el in liste_elementer}
|
||||
fælles_prefix_længde = _find_fælles_prefix(liste_elementer)
|
||||
|
||||
for ce in choice_elementer:
|
||||
# Find den overliggende komplekse type
|
||||
overliggende_type = None
|
||||
rod_element_navn = ce.rod_sti.split(".")[-1]
|
||||
for el in liste_elementer:
|
||||
if el.element_navn == rod_element_navn and el.type_navn:
|
||||
type_navn_rent = el.type_navn.split(":")[-1] if ":" in el.type_navn else el.type_navn
|
||||
overliggende_type = komplekse_typer.get(type_navn_rent)
|
||||
break
|
||||
|
||||
for alt_navn, alt_skabelon, alt_sti in ce.alternativer:
|
||||
# Byg filnavn med samme prefix-logik som generer_udtræk_yaml
|
||||
rod_dele = ce.rod_sti.split(".")
|
||||
unikke_dele = rod_dele[fælles_prefix_længde:] if len(rod_dele) > fælles_prefix_længde else rod_dele
|
||||
rod_del_str = _anvend_forkortelser("_".join(unikke_dele).lower(), forkortelser)
|
||||
alt_forkortet = _anvend_forkortelser(alt_navn.lower(), forkortelser)
|
||||
fil_del = f"{rod_del_str}_{alt_forkortet}"
|
||||
fil_navn = f"{prefix}_{fil_del}.yaml"
|
||||
|
||||
hvis_finder_sti = f"{ce.element_navn}.{alt_navn}"
|
||||
prefix_navn = _anvend_forkortelser(
|
||||
f"{ce.element_navn.lower()}_{alt_navn.lower()}_".replace(".", "_"),
|
||||
forkortelser
|
||||
)
|
||||
|
||||
linjer = []
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append(f"# {fil_navn}")
|
||||
linjer.append("#")
|
||||
linjer.append(f"# Auto-genereret udtræk for {rod_element_navn} – {alt_navn} alternativ.")
|
||||
linjer.append(f"# Rod-sti: {ce.rod_sti}")
|
||||
linjer.append(f"# Kører kun hvis {hvis_finder_sti} eksisterer i XML.")
|
||||
if ce.overliggende:
|
||||
linjer.append("#")
|
||||
linjer.append("# Udfyld nøgle-skabeloner i nøgler.yaml inden brug.")
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append("")
|
||||
linjer.append("output_filer:")
|
||||
linjer.append("")
|
||||
linjer.append(f" - rod: {ce.rod_sti}")
|
||||
linjer.append(f" hvis_findes: {hvis_finder_sti}")
|
||||
linjer.append(f" kolonner:")
|
||||
|
||||
# Nøgler
|
||||
if ce.overliggende:
|
||||
linjer.append(f" # Nøgler fra overliggende niveauer – udfyld i nøgler.yaml")
|
||||
for ov_navn in ce.overliggende:
|
||||
skabelon_navn = _anvend_forkortelser(
|
||||
f"{ov_navn.lower()}_nøgle", forkortelser
|
||||
)
|
||||
linjer.append(f" - skabelon: {skabelon_navn}")
|
||||
linjer.append("")
|
||||
|
||||
# Choice-alternativets skabelon
|
||||
linjer.append(f" # {alt_navn} felter")
|
||||
linjer.append(f" - skabelon: {alt_skabelon}")
|
||||
linjer.append(f" prefix_felt: {ce.element_navn}.{alt_navn}")
|
||||
linjer.append(f' prefix_navn: "{prefix_navn}"')
|
||||
|
||||
# Øvrige felter fra den overliggende type
|
||||
if overliggende_type and overliggende_type.felter:
|
||||
ikke_choice_felter = [
|
||||
f for f in overliggende_type.felter
|
||||
if not (isinstance(f, XsdSkabelonRef) and f.er_choice)
|
||||
and not (isinstance(f, XsdFelt) and f.er_choice)
|
||||
]
|
||||
if ikke_choice_felter:
|
||||
linjer.append("")
|
||||
linjer.append(f" # Øvrige felter fra {rod_element_navn}")
|
||||
for felt in ikke_choice_felter:
|
||||
if isinstance(felt, XsdSkabelonRef):
|
||||
linjer.extend(_skriv_skabelon_ref(
|
||||
felt, " ", forkortelser, liste_navne
|
||||
))
|
||||
elif isinstance(felt, XsdFelt):
|
||||
linjer.extend(_skriv_felt(
|
||||
felt, " ", forkortelser, liste_navne
|
||||
))
|
||||
|
||||
linjer.append("")
|
||||
linjer.append(f" outputs:")
|
||||
|
||||
if output_type in ("fil", "begge"):
|
||||
linjer.append(f" - fil_navn: \"{fil_del}_{{yyyy}}{{mm}}{{dd}}.txt\"")
|
||||
linjer.append(f" overskrifter: true")
|
||||
linjer.append("")
|
||||
|
||||
if output_type in ("tabel", "begge"):
|
||||
tabel_navn = f"{tabel_prefix}{fil_del}"
|
||||
linjer.append(f" - tabel:")
|
||||
linjer.append(f" staging: {db_schema}.{tabel_navn}_tmp")
|
||||
linjer.append(f" blivende: {db_schema}.{tabel_navn}")
|
||||
linjer.append(f" historik: {historik}")
|
||||
linjer.append(f" # TODO: Angiv forretningsnøgler")
|
||||
linjer.append(f" forretnings_nøgler: [__NØGLE__]")
|
||||
linjer.append(f" tekniske_nøgler: [løbenummer]")
|
||||
linjer.append(f" ekstra_kolonner:")
|
||||
linjer.append(f" - navn: løbenummer")
|
||||
linjer.append(f' ddl_type: "int identity"')
|
||||
linjer.append(f" placering: start")
|
||||
linjer.append("")
|
||||
|
||||
sti = os.path.join(output_mappe, fil_navn)
|
||||
with open(sti, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(linjer))
|
||||
print(f" Skrev: {sti}")
|
||||
|
||||
# ============================================================
|
||||
# Generator: nøgler.yaml
|
||||
# ============================================================
|
||||
|
||||
def generer_nøgler_yaml(
|
||||
liste_elementer: list[XsdListeElement],
|
||||
output_mappe: str,
|
||||
forkortelser: dict[str, str] = {}
|
||||
) -> None:
|
||||
# Find unikke overliggende niveauer
|
||||
overliggende: dict[str, str] = {}
|
||||
for el in liste_elementer:
|
||||
rod_dele = el.rod_sti.split(".")
|
||||
for ov_navn in el.overliggende:
|
||||
if ov_navn not in overliggende:
|
||||
try:
|
||||
idx = rod_dele.index(ov_navn)
|
||||
overliggende[ov_navn] = ".".join(rod_dele[:idx + 1])
|
||||
except ValueError:
|
||||
overliggende[ov_navn] = f"__{ov_navn.upper()}__"
|
||||
|
||||
if not overliggende:
|
||||
return
|
||||
|
||||
linjer = []
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append("# nøgler.yaml")
|
||||
linjer.append("#")
|
||||
linjer.append("# Nøgle-skabeloner der binder udtræk-filer til overliggende niveauer.")
|
||||
linjer.append("# UDFYLD: Erstat __FELT__ med den korrekte forretningsnøgle.")
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append("")
|
||||
linjer.append("kolonne_skabeloner:")
|
||||
linjer.append("")
|
||||
|
||||
for ov_navn, ov_rod_sti in overliggende.items():
|
||||
skabelon_navn = _anvend_forkortelser(f"{ov_navn.lower()}_nøgle", forkortelser)
|
||||
linjer.append(f" # Nøgle fra {ov_navn}-niveau")
|
||||
linjer.append(f" # TODO: Angiv den korrekte forretningsnøgle for {ov_navn}")
|
||||
linjer.append(f" {skabelon_navn}:")
|
||||
linjer.append(f" - navn: __FELT__")
|
||||
linjer.append(f" felt: \"__FELT__\" # fx \"@id\" eller \"nr\"")
|
||||
linjer.append(f" rod: {ov_rod_sti}")
|
||||
linjer.append("")
|
||||
|
||||
sti = os.path.join(output_mappe, "nøgler.yaml")
|
||||
with open(sti, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(linjer))
|
||||
print(f" Skrev: {sti}")
|
||||
|
||||
|
||||
def generer_udtræk_yaml(
|
||||
liste_elementer: list[XsdListeElement],
|
||||
komplekse_typer: dict[str, XsdKompleksType],
|
||||
output_mappe: str,
|
||||
prefix: str = "udtræk",
|
||||
min_felter: int = 1,
|
||||
forkortelser: dict[str, str] = {},
|
||||
db_schema: str = "dbo",
|
||||
tabel_prefix: str = "",
|
||||
output_type: str = "begge",
|
||||
historik: str = "t2"
|
||||
) -> None:
|
||||
|
||||
# Beregn fælles prefix på tværs af alle rod-stier
|
||||
fælles_prefix_længde = _find_fælles_prefix(liste_elementer)
|
||||
|
||||
fil_navne_index: dict[str, str] = {}
|
||||
for el in liste_elementer:
|
||||
rod_dele = el.rod_sti.split(".")
|
||||
unikke_dele = rod_dele[fælles_prefix_længde:] if len(rod_dele) > fælles_prefix_længde else rod_dele
|
||||
fil_del = _anvend_forkortelser("_".join(unikke_dele).lower(), forkortelser)
|
||||
if el.element_navn not in fil_navne_index:
|
||||
fil_navne_index[el.element_navn] = f"udtræk_{fil_del}.yaml"
|
||||
|
||||
# Byg choice-grupper til kommentar-generering (ikke filnavne)
|
||||
choice_grupper = _byg_choice_grupper(liste_elementer)
|
||||
|
||||
# Byg sæt af liste-element navne til udkommentering
|
||||
liste_navne = {el.element_navn for el in liste_elementer}
|
||||
|
||||
for el in liste_elementer:
|
||||
|
||||
# Byg unikt filnavn fra rod-stien minus fælles prefix
|
||||
rod_dele = el.rod_sti.split(".")
|
||||
unikke_dele = rod_dele[fælles_prefix_længde:] if len(rod_dele) > fælles_prefix_længde else rod_dele
|
||||
fil_del = _anvend_forkortelser(
|
||||
"_".join(unikke_dele).lower(), forkortelser
|
||||
)
|
||||
fil_navn = f"{prefix}_{fil_del}.yaml"
|
||||
|
||||
gruppe = choice_grupper.get(el.rod_sti)
|
||||
|
||||
linjer = []
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append(f"# {fil_navn}")
|
||||
linjer.append("#")
|
||||
linjer.append(f"# Auto-genereret udtræk for {el.element_navn}.")
|
||||
linjer.append(f"# Rod-sti: {el.rod_sti}")
|
||||
if el.er_simpel:
|
||||
linjer.append("#")
|
||||
linjer.append(f"# OBS: Dette element er simpelt ({el.element_navn} har få felter).")
|
||||
linjer.append(f"# Det er foldet ind som join-kolonne i skabelon: {el.join_i_skabelon}")
|
||||
linjer.append(f"# Denne fil er udkommenteret i config – fjern udkommentering for fuldt udtræk.")
|
||||
if el.overliggende:
|
||||
linjer.append("#")
|
||||
linjer.append("# Udfyld nøgle-skabeloner i nøgler.yaml inden brug.")
|
||||
if gruppe:
|
||||
linjer.append("#")
|
||||
linjer.append("# OBS: Dette element forekommer i flere choice-grene:")
|
||||
for valg in gruppe['valg']:
|
||||
sti = f"{gruppe['fælles_rod']}.{valg}.{gruppe['element_navn']}"
|
||||
linjer.append(f"# - {sti}")
|
||||
linjer.append("# Disse kan samles i én fil ved at ændre rod til en rod med varianter:")
|
||||
linjer.append(f"# rod: {gruppe['samlet_rod']}")
|
||||
linjer.append("# Tilføj evt. en kolonne til at skelne grenene:")
|
||||
linjer.append("# - navn: bruger_type")
|
||||
linjer.append("# type: rod_variant")
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append("")
|
||||
linjer.append("output_filer:")
|
||||
linjer.append("")
|
||||
linjer.append(f" - rod: {el.rod_sti}")
|
||||
linjer.append(f" kolonner:")
|
||||
|
||||
# Nøgler
|
||||
if el.overliggende:
|
||||
linjer.append(f" # Nøgler fra overliggende niveauer – udfyld i nøgler.yaml")
|
||||
for ov_navn in el.overliggende:
|
||||
skabelon_navn = _anvend_forkortelser(
|
||||
f"{ov_navn.lower()}_nøgle", forkortelser
|
||||
)
|
||||
linjer.append(f" - skabelon: {skabelon_navn}")
|
||||
linjer.append("")
|
||||
|
||||
# Felter fra elementets type eller inline felter
|
||||
type_navn_rent = _strip_prefix(el.type_navn) if el.type_navn else ""
|
||||
kt = komplekse_typer.get(type_navn_rent)
|
||||
|
||||
if kt and kt.felter and not kt.er_ekstern:
|
||||
skabelon_navn = type_navn_rent.replace("_Type", "").replace("Type", "").lower()
|
||||
linjer.append(f" # Felter fra {el.element_navn} – se skabeloner.yaml")
|
||||
linjer.append(f" - skabelon: {skabelon_navn}")
|
||||
elif el.felter:
|
||||
linjer.append(f" # Felter fra {el.element_navn}")
|
||||
for felt in el.felter:
|
||||
if isinstance(felt, XsdSkabelonRef):
|
||||
linjer.extend(_skriv_skabelon_ref(
|
||||
felt, " ", forkortelser, liste_navne,
|
||||
fil_navne_index=fil_navne_index
|
||||
))
|
||||
elif isinstance(felt, XsdFelt):
|
||||
linjer.extend(_skriv_felt(
|
||||
felt, " ", forkortelser, liste_navne # ← tilføj liste_navne
|
||||
))
|
||||
elif el.er_simpel:
|
||||
linjer.append(f" # Feltets egen værdi")
|
||||
linjer.append(f" - navn: {el.element_navn}")
|
||||
linjer.append(f" felt: \".\"")
|
||||
|
||||
linjer.append("")
|
||||
linjer.append(f" outputs:")
|
||||
|
||||
if output_type in ("fil", "begge"):
|
||||
linjer.append(f" - fil_navn: \"{fil_del}_{{yyyy}}{{mm}}{{dd}}.txt\"")
|
||||
linjer.append(f" overskrifter: true")
|
||||
linjer.append("")
|
||||
|
||||
if output_type in ("tabel", "begge"):
|
||||
tabel_navn = f"{tabel_prefix}{fil_del}"
|
||||
linjer.append(f" - tabel:")
|
||||
linjer.append(f" staging: {db_schema}.{tabel_navn}_tmp")
|
||||
linjer.append(f" blivende: {db_schema}.{tabel_navn}")
|
||||
linjer.append(f" historik: {historik}")
|
||||
linjer.append(f" # TODO: Angiv forretningsnøgler")
|
||||
linjer.append(f" forretnings_nøgler: [__NØGLE__]")
|
||||
linjer.append(f" tekniske_nøgler: [løbenummer]")
|
||||
linjer.append(f" ekstra_kolonner:")
|
||||
linjer.append(f" - navn: løbenummer")
|
||||
linjer.append(f' ddl_type: "int identity"')
|
||||
linjer.append(f" placering: start")
|
||||
linjer.append("")
|
||||
linjer.append("")
|
||||
|
||||
sti = os.path.join(output_mappe, fil_navn)
|
||||
with open(sti, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(linjer))
|
||||
print(f" Skrev: {sti}")
|
||||
|
||||
# ============================================================
|
||||
# Generator: config_xxx.yaml
|
||||
# ============================================================
|
||||
|
||||
def generer_hoved_config(
|
||||
liste_elementer: list[XsdListeElement],
|
||||
xsd_sti: str,
|
||||
output_mappe: str,
|
||||
prefix: str = "udtræk",
|
||||
forkortelser: dict[str, str] = {}
|
||||
) -> None:
|
||||
xsd_basis = os.path.splitext(os.path.basename(xsd_sti))[0]
|
||||
|
||||
# Beregn fælles prefix – samme logik som generer_udtræk_yaml
|
||||
fælles_prefix_længde = _find_fælles_prefix(liste_elementer)
|
||||
|
||||
linjer = []
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append(f"# config_{xsd_basis}.yaml")
|
||||
linjer.append("#")
|
||||
linjer.append("# Auto-genereret hoved-config.")
|
||||
linjer.append("# Tilpas config-sektionen med korrekte stier og indstillinger.")
|
||||
linjer.append("# =============================================================================")
|
||||
linjer.append("")
|
||||
linjer.append("include:")
|
||||
linjer.append(" - skabeloner.yaml")
|
||||
linjer.append(" - nøgler.yaml")
|
||||
linjer.append("")
|
||||
|
||||
normale = []
|
||||
simple = []
|
||||
|
||||
for el in liste_elementer:
|
||||
rod_dele = el.rod_sti.split(".")
|
||||
unikke_dele = rod_dele[fælles_prefix_længde:] if len(rod_dele) > fælles_prefix_længde else rod_dele
|
||||
fil_del = _anvend_forkortelser(
|
||||
"_".join(unikke_dele).lower(), forkortelser
|
||||
)
|
||||
fil = f"{prefix}_{fil_del}.yaml"
|
||||
if el.er_simpel:
|
||||
simple.append((fil, el))
|
||||
else:
|
||||
normale.append((fil, el))
|
||||
|
||||
for fil, _ in normale:
|
||||
linjer.append(f" - {fil}")
|
||||
|
||||
if simple:
|
||||
linjer.append("")
|
||||
linjer.append(" # Simple lister – foldet ind som join-kolonner i skabelonerne:")
|
||||
linjer.append(" # Fjern udkommentering for fuldt udtræk i stedet for join.")
|
||||
for fil, el in simple:
|
||||
linjer.append(f" # - {fil} (join i skabelon: {el.join_i_skabelon})")
|
||||
|
||||
linjer.append("")
|
||||
linjer.append("config:")
|
||||
linjer.append(f" input_fil: {xsd_basis}.xml")
|
||||
linjer.append(" output_path: __OUTPUT_PATH__")
|
||||
linjer.append(" logfil: log_{yyyy}{mm}{dd}.txt")
|
||||
linjer.append(" log_niveau: info")
|
||||
linjer.append(" log_output: begge")
|
||||
linjer.append(" encoding: utf-8")
|
||||
linjer.append(' separator: "\\t"')
|
||||
linjer.append(" skrivetilstand: w")
|
||||
linjer.append(' dato_ind: "%Y-%m-%d"')
|
||||
linjer.append(' dato_ud: "%d-%m-%Y"')
|
||||
|
||||
sti = os.path.join(output_mappe, f"config_{xsd_basis}.yaml")
|
||||
with open(sti, "w", encoding="utf-8") as f:
|
||||
f.write("\n".join(linjer))
|
||||
print(f" Skrev: {sti}")
|
||||
|
||||
def _strip_prefix(type_navn: str) -> str:
|
||||
if not type_navn or ":" not in type_navn:
|
||||
return type_navn
|
||||
prefix, lokalt = type_navn.split(":", 1)
|
||||
if prefix in ("xs", "xsd"):
|
||||
return type_navn
|
||||
return lokalt
|
||||
Reference in New Issue
Block a user