diff --git a/Manual/.~lock.manual_kap15_v3.odt# b/Manual/.~lock.manual_kap15_v3.odt# new file mode 100644 index 0000000..932ab2d --- /dev/null +++ b/Manual/.~lock.manual_kap15_v3.odt# @@ -0,0 +1 @@ +,carsten,carsten-mint,04.04.2026 15:42,file:///home/carsten/.config/libreoffice/4; \ No newline at end of file diff --git a/Manual/manual_kap15_v3.odt b/Manual/manual_kap15_v3.odt new file mode 100644 index 0000000..d3ba061 Binary files /dev/null and b/Manual/manual_kap15_v3.odt differ diff --git a/udpak_semistruktur.py b/udpak_semistruktur.py index afa06ca..435ca18 100644 --- a/udpak_semistruktur.py +++ b/udpak_semistruktur.py @@ -70,7 +70,7 @@ def _kør_udtræk(config: dict, global_config: dict) -> None: # Opret DB-forbindelse hvis der er tabel-output har_tabel_output = any( - cfg.get("type") == "tabel" + cfg.get("type") in ("tabel", "tabel_avanceret") for cfg in config.get("output_filer", []) ) conn = None @@ -148,6 +148,24 @@ def _kør_udtræk(config: dict, global_config: dict) -> None: if skrevet: fejl_filer_skrevet.append(skrevet) + elif cfg.get("type") == "tabel_avanceret": + tabel_cfg = cfg.get("tabel", {}) + staging = tabel_cfg.get("staging") + if staging: + kolonner = [k["navn"] for k in cfg["kolonner"]] + indsatte, fejlede = insert_rows_ase( + conn, staging, kolonner, tmp_data["rækker"] + ) + logger.info(f"DB: {indsatte} rækker indsat i {staging}") + if fejlede: + logger.warning(f"DB: {len(fejlede)} rækker fejlede i {staging}") + fejl_base = os.path.join(global_config["output_path"], + staging.replace(".", "_")) + skrevet = _skriv_fejl_fil(tmp_data, global_config.get("fejl_fil_ext"), + fejl_base, cfg["kolonner"], separator, encoding) + if skrevet: + fejl_filer_skrevet.append(skrevet) + # Tjek om alle output-filer gav 0 rækker if samlet_antal_rækker == 0: logger.warning("0 rækker genereret i alle output-filer for dette input.") diff --git a/udpak_semistruktur/config.py b/udpak_semistruktur/config.py index 67f87de..1c0ab27 100644 --- a/udpak_semistruktur/config.py +++ b/udpak_semistruktur/config.py @@ -115,10 +115,12 @@ def _expand_output_groups_new_only(cfg: dict) -> dict: ctype = "fil" elif "tabel_navn" in child: ctype = "tabel" + elif "tabel" in child and isinstance(child["tabel"], dict): + ctype = "tabel_avanceret" else: raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} mangler 'type' og kan ikke udledes.") - if ctype not in ("fil", "tabel"): + if ctype not in ("fil", "tabel", "tabel_avanceret"): raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} har ukendt type '{ctype}'.") merged = _deep_merge_dicts(common, child) @@ -126,11 +128,11 @@ def _expand_output_groups_new_only(cfg: dict) -> dict: # Sikr præcis én destination has_file = "fil_navn" in merged has_table = "tabel_navn" in merged + has_avanceret = "tabel" in merged and isinstance(merged.get("tabel"), dict) if has_file and has_table: raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} må ikke have både 'fil_navn' og 'tabel_navn'.") - if not has_file and not has_table: - raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} skal have enten 'fil_navn' eller 'tabel_navn'.") - + if not has_file and not has_table and not has_avanceret: + raise ValueError(f"outputs-element #{cidx+1} i gruppe #{idx+1} skal have enten 'fil_navn', 'tabel_navn' eller 'tabel'.") # 'overskrifter' gælder kun for filer; brug gruppens default hvis ikke sat if ctype == "fil": if "overskrifter" not in merged and group_default_headers is not None: @@ -276,13 +278,15 @@ def valider_yaml(yaml_file_path: str) -> dict: for file_cfg in config["output_filer"]: if not isinstance(file_cfg, dict): raise ValueError("Hvert output-element skal være et objekt.") - if ("fil_navn" in file_cfg) == ("tabel_navn" in file_cfg): - raise ValueError("Hvert output skal have præcis én af 'fil_navn' eller 'tabel_navn'.") + har_avanceret = isinstance(file_cfg.get("tabel"), dict) + if not har_avanceret: + if ("fil_navn" in file_cfg) == ("tabel_navn" in file_cfg): + raise ValueError("Hvert output skal have præcis én af 'fil_navn' eller 'tabel_navn'.") if "rod" not in file_cfg: raise ValueError("Hvert output skal have en 'rod'.") if "kolonner" not in file_cfg or not isinstance(file_cfg["kolonner"], list): raise ValueError("Hvert output skal have 'kolonner' som en liste.") - if "tabel_navn" in file_cfg: + if "tabel_navn" in file_cfg or har_avanceret: file_cfg.pop("overskrifter", None) # kun relevant for filer if "hvis_findes" in file_cfg: diff --git a/udpak_semistruktur/ddl.py b/udpak_semistruktur/ddl.py index 44cff2c..27092f4 100644 --- a/udpak_semistruktur/ddl.py +++ b/udpak_semistruktur/ddl.py @@ -31,6 +31,61 @@ from udpak_semistruktur.logger import hent_logger logger = hent_logger(__name__) +def _byg_staging_kolonner(file_conf: dict) -> list: + """ + Returnerer kolonner til staging-tabellen. + Det er præcis kolonnerne fra YAML – ingen ekstra. + """ + return file_conf.get("kolonner", []) + + +def _byg_blivende_kolonner(file_conf: dict) -> list: + """ + Returnerer kolonner til den blivende tabel. + Sammensætter: start-ekstra + yaml-kolonner + slut-ekstra. + """ + tabel_cfg = file_conf.get("tabel", {}) + ekstra = tabel_cfg.get("ekstra_kolonner", []) + + start_kolonner = [k for k in ekstra if k.get("placering") == "start"] + slut_kolonner = [k for k in ekstra if k.get("placering") == "slut"] + yaml_kolonner = file_conf.get("kolonner", []) + + return start_kolonner + yaml_kolonner + slut_kolonner + +def _map_ekstra_kolonne_til_sql(ekstra_kol: dict) -> str: + """ + Konverterer en ekstra-kolonne til en SQL-kolonnelinje. + + Tre mulige former: + 1) Beregnet kolonne: "navn" ddl_type AS + – bruges automatisk når default indeholder case, select eller convert + 2) Med default: "navn" ddl_type DEFAULT NULL + 3) Simpel: "navn" ddl_type NULL + + Identity-kolonner sættes altid til NOT NULL. + """ + navn = ekstra_kol["navn"] + ddl_type = ekstra_kol.get("ddl_type", "VARCHAR(50)") + default = ekstra_kol.get("default") + påkrævet = ekstra_kol.get("påkrævet", False) + + # Identity-kolonner er altid NOT NULL + er_identity = "identity" in ddl_type.lower() + not_null = "NOT NULL" if (påkrævet or er_identity) else "NULL" + + if default is not None: + # Beregnet kolonne hvis default indeholder udtryk der kræver AS-syntaks + beregnet_nøgleord = ("case ", "select ", "convert(", "(select ") + er_beregnet = any(kw in default.lower() for kw in beregnet_nøgleord) + + if er_beregnet: + return f' "{navn}" {ddl_type} AS {default}' + + return f' "{navn}" {ddl_type} DEFAULT {default} {not_null}' + + return f' "{navn}" {ddl_type} {not_null}' + # ------------------------------------------------------------ # CLI wiring # ------------------------------------------------------------ @@ -63,6 +118,10 @@ def _map_yaml_type_to_ase(col: dict, dato_ud_global = "%Y-%m-%d") -> str: - decimaler: int (scale til decimal) Fallback for ukendt/uden type: VARCHAR(255) """ + # Ekstra-kolonner har ddl_type direkte – returner den rå type + if "ddl_type" in col: + return col["ddl_type"] + t = str(col.get("type", "string")).lower() length = col.get("max_længde", col.get("length", col.get("truncate"))) precision = col.get("precision", 18) @@ -279,9 +338,9 @@ def run_ddl_mode(args, config: Dict[str, Any], global_config: Dict[str, Any]) -> """ Executes the full DDL-only flow and writes files. - --tmp flaget styrer om der genereres én eller to tabeller: - - Uden --tmp: kun den tabel der er nævnt i YAML - - Med --tmp: både den nævnte tabel og dens modpart (base↔tmp) + To flows: + - Nyt flow: file_conf har 'tabel'-sektion med staging/blivende/historik + - Gammelt flow: file_conf har kun 'tabel_navn' (bagudkompatibelt) """ outdir = os.path.join(global_config["output_path"], "sql") @@ -293,93 +352,390 @@ def run_ddl_mode(args, config: Dict[str, Any], global_config: Dict[str, Any]) -> output_filer = config.get("output_filer", []) for file_conf in output_filer: + tabel_cfg = file_conf.get("tabel", {}) + har_nyt_flow = bool(tabel_cfg.get("staging") or tabel_cfg.get("blivende")) tabel = file_conf.get("tabel_navn") - if not tabel: + + # Spring over hvis hverken nyt eller gammelt flow har tabelinfo + if not har_nyt_flow and not tabel: continue try: - base_tabel, tmp_tabel = split_base_tmp(tabel) - yaml_is_tmp = tabel.lower().endswith("_tmp") - skal_lave_tmp = bool(getattr(args, "tmp", False)) + # ============================================================= + # NYT FLOW – tabel-sektion med staging/blivende/historik + # ============================================================= + if har_nyt_flow: + staging = tabel_cfg.get("staging") + blivende = tabel_cfg.get("blivende") + historik = tabel_cfg.get("historik") - # --------------------------------------------------------- - # 1) Primær DDL – brug tabelnavnet præcis som det er i YAML - # --------------------------------------------------------- - primær_conf = deepcopy(file_conf) - ddl_sql_primær = generate_create_table_sql(primær_conf, global_config) - samlet_sql_indhold.append(f"-- Tabel: {tabel}\n{ddl_sql_primær}\n") + # 1) Staging DDL + if staging: + sql = generate_create_staging_sql(file_conf, global_config) + sti = os.path.join(outdir, _default_ddl_filename(staging)) + with open(sti, "w", encoding="utf-8") as f: + f.write(sql) + samlet_sql_indhold.append(f"-- Staging: {staging}\n{sql}\n") + logger.info(f"[DDL] Skrev staging: {sti}") + antal += 1 - ddl_primær_name = file_conf.get("ddl_fil_navn") or _default_ddl_filename(tabel) - ddl_primær_name = generer_filnavn(ddl_primær_name, global_config) - ddl_primær_path = os.path.join(outdir, ddl_primær_name) + # 2) Blivende DDL + if blivende: + sql = generate_create_blivende_sql(file_conf, global_config) + sti = os.path.join(outdir, _default_ddl_filename(blivende)) + with open(sti, "w", encoding="utf-8") as f: + f.write(sql) + samlet_sql_indhold.append(f"-- Blivende: {blivende}\n{sql}\n") + logger.info(f"[DDL] Skrev blivende: {sti}") + antal += 1 - with open(ddl_primær_path, "w", encoding="utf-8") as f: - f.write(ddl_sql_primær) + # 3) Indexes + if blivende: + ix_sql = generate_indexes_sql(file_conf) + if ix_sql: + ix_sti = os.path.join(outdir, f"{_safe_name(blivende)}_indexes.sql") + with open(ix_sti, "w", encoding="utf-8") as f: + f.write(ix_sql) + samlet_sql_indhold.append(f"-- Indexes: {blivende}\n{ix_sql}\n") + logger.info(f"[DDL] Skrev indexes: {ix_sti}") - logger.info(f"[DDL] Skrev {ddl_primær_path}") - antal += 1 + # 4) Flyt-scripts + if getattr(args, "flyt", False) or getattr(args, "flyt_kort", False): + if historik == "t2": + flyt_sql = generate_t2_flyt_sql(file_conf) + elif historik == "t1": + flyt_sql = generate_t1_flyt_sql(file_conf) + else: + flyt_sql = generate_t1_flyt_sql(file_conf) # default t1 - # --------------------------------------------------------- - # 2) Sekundær DDL – kun hvis --tmp er givet - # YAML er base → sekundær er tmp - # YAML er _tmp → sekundær er base - # --------------------------------------------------------- - - if skal_lave_tmp: - sekundær_tabel = tmp_tabel if not yaml_is_tmp else base_tabel - sekundær_conf = deepcopy(file_conf) - sekundær_conf["tabel_navn"] = sekundær_tabel + base_navn = blivende or staging + flyt_sti = os.path.join(outdir, _default_flyt_filename(base_navn)) + with open(flyt_sti, "w", encoding="utf-8") as f: + f.write(flyt_sql) + samlet_flyt_indhold.append(f"-- FLYT: {staging} -> {blivende}\n{flyt_sql}\n") + logger.info(f"[DDL] Skrev flyt: {flyt_sti}") - ddl_sql_sekundær = generate_create_table_sql(sekundær_conf, global_config) - samlet_sql_indhold.append(f"-- Tabel: {sekundær_tabel}\n{ddl_sql_sekundær}\n") + # ============================================================= + # GAMMELT FLOW – kun tabel_navn (bagudkompatibelt) + # ============================================================= + else: + base_tabel, tmp_tabel = split_base_tmp(tabel) + yaml_is_tmp = tabel.lower().endswith("_tmp") + skal_lave_tmp = bool(getattr(args, "tmp", False)) - ddl_sekundær_name = generer_filnavn(_default_ddl_filename(sekundær_tabel), global_config) - ddl_sekundær_path = os.path.join(outdir, ddl_sekundær_name) + # Primær DDL + primær_conf = deepcopy(file_conf) + ddl_sql = generate_create_table_sql(primær_conf, global_config) + samlet_sql_indhold.append(f"-- Tabel: {tabel}\n{ddl_sql}\n") - with open(ddl_sekundær_path, "w", encoding="utf-8") as f: - f.write(ddl_sql_sekundær) - - logger.info(f"[DDL] Skrev {ddl_sekundær_path}") + ddl_navn = file_conf.get("ddl_fil_navn") or _default_ddl_filename(tabel) + ddl_navn = generer_filnavn(ddl_navn, global_config) + ddl_sti = os.path.join(outdir, ddl_navn) + with open(ddl_sti, "w", encoding="utf-8") as f: + f.write(ddl_sql) + logger.info(f"[DDL] Skrev {ddl_sti}") antal += 1 - # --------------------------------------------------------- - # 3) Flyt scripts – kun hvis --flyt er givet - # --------------------------------------------------------- - if getattr(args, "flyt", False): - _skriv_flyt_scripts( - tabel, base_tabel, tmp_tabel, file_conf, outdir, - generate_insert_move_sql, samlet_flyt_indhold - ) + # Sekundær DDL + if skal_lave_tmp: + sek_tabel = tmp_tabel if not yaml_is_tmp else base_tabel + sek_conf = deepcopy(file_conf) + sek_conf["tabel_navn"] = sek_tabel + sek_sql = generate_create_table_sql(sek_conf, global_config) + samlet_sql_indhold.append(f"-- Tabel: {sek_tabel}\n{sek_sql}\n") + sek_navn = generer_filnavn(_default_ddl_filename(sek_tabel), global_config) + sek_sti = os.path.join(outdir, sek_navn) + with open(sek_sti, "w", encoding="utf-8") as f: + f.write(sek_sql) + logger.info(f"[DDL] Skrev {sek_sti}") + antal += 1 - # --------------------------------------------------------- - # 4) Flyt scripts kort – kun hvis --flyt_kort er givet - # --------------------------------------------------------- - if getattr(args, "flyt_kort", False): - _skriv_flyt_scripts( - tabel, base_tabel, tmp_tabel, file_conf, outdir, - generate_insert_move_sql_short, samlet_flyt_indhold - ) + # Flyt-scripts + if getattr(args, "flyt", False): + _skriv_flyt_scripts( + tabel, base_tabel, tmp_tabel, file_conf, outdir, + generate_insert_move_sql, samlet_flyt_indhold + ) + if getattr(args, "flyt_kort", False): + _skriv_flyt_scripts( + tabel, base_tabel, tmp_tabel, file_conf, outdir, + generate_insert_move_sql_short, samlet_flyt_indhold + ) except Exception as e: - logger.error(f"[DDL] Fejl for {tabel}: {e}") + logger.error(f"[DDL] Fejl: {e}") raise - # --------------------------------------------------------- # Samlede filer - # --------------------------------------------------------- if antal > 0: samlet_sti = os.path.join(outdir, "sql_samlet.sql") - with open(samlet_sti, "w", encoding="utf-8") as f_alt: - f_alt.write("\n".join(samlet_sql_indhold)) + with open(samlet_sti, "w", encoding="utf-8") as f: + f.write("\n".join(samlet_sql_indhold)) logger.info(f"[DDL] Skrev samlet fil: {samlet_sti}") - if (getattr(args, "flyt", False) or getattr(args, "flyt_kort", False)) and samlet_flyt_indhold: + if samlet_flyt_indhold: samlet_flyt_sti = os.path.join(outdir, "sql_flyt_samlet.sql") - with open(samlet_flyt_sti, "w", encoding="utf-8") as f_flyt_alt: - f_flyt_alt.write("\n".join(samlet_flyt_indhold)) + with open(samlet_flyt_sti, "w", encoding="utf-8") as f: + f.write("\n".join(samlet_flyt_indhold)) logger.info(f"[FLYT] Skrev samlet fil: {samlet_flyt_sti}") logger.info(f"[DDL] FÆRDIG: {antal} fil(er) genereret.") else: logger.info("[DDL] Ingen DDL genereret.") + +def generate_create_staging_sql(file_conf: dict, global_config: dict) -> str: + """ + Genererer CREATE TABLE DDL for staging-tabellen. + Kun kolonner fra YAML – ingen ekstra_kolonner. + Tabelnavnet hentes fra tabel.staging. + """ + tabel_cfg = file_conf.get("tabel", {}) + table = tabel_cfg.get("staging") + if not table: + raise ValueError("Kan ikke generere staging DDL: 'tabel.staging' mangler.") + + cols = _byg_staging_kolonner(file_conf) + if not cols: + raise ValueError(f"Kan ikke generere DDL for {table}: 'kolonner' er tom.") + + col_lines = [] + for col in cols: + name = col["navn"] + sql_type = _map_yaml_type_to_ase(col, global_config.get("dato_ud", "%Y-%m-%d")) + not_null = "NOT NULL" if col.get("påkrævet") else "NULL" + col_lines.append(f' "{name}" {sql_type} {not_null}') + + cols_block = ",\n".join(col_lines) + table_only = table.split(".")[-1] + + drop_part = ( + f"IF EXISTS (SELECT 1 FROM sysobjects WHERE name = '{table_only}' AND type = 'U')\n" + f"BEGIN\n" + f" DROP TABLE {table}\n" + f"END\nGO\n\n" + ) + create_part = ( + f"CREATE TABLE {table} (\n" + f"{cols_block}\n" + f");\nGO\n" + ) + return drop_part + create_part + + +def generate_create_blivende_sql(file_conf: dict, global_config: dict) -> str: + """ + Genererer CREATE TABLE DDL for den blivende tabel. + Kolonner: start-ekstra + yaml-kolonner + slut-ekstra. + Tabelnavnet hentes fra tabel.blivende. + Tilføjer PRIMARY KEY på tekniske_nøgler hvis angivet. + """ + tabel_cfg = file_conf.get("tabel", {}) + table = tabel_cfg.get("blivende") + if not table: + raise ValueError("Kan ikke generere blivende DDL: 'tabel.blivende' mangler.") + + tekniske_nøgler = tabel_cfg.get("tekniske_nøgler", []) + forretnings_nøgler = tabel_cfg.get("forretnings_nøgler", []) + + alle_kolonner = _byg_blivende_kolonner(file_conf) + if not alle_kolonner: + raise ValueError(f"Kan ikke generere DDL for {table}: ingen kolonner.") + + col_lines = [] + pk_cols = list(tekniske_nøgler) # tekniske nøgler → PRIMARY KEY + + for col in alle_kolonner: + name = col["navn"] + if "ddl_type" in col: + # Ekstra-kolonne – brug _map_ekstra_kolonne_til_sql + col_lines.append(_map_ekstra_kolonne_til_sql(col)) + else: + # Normal YAML-kolonne + sql_type = _map_yaml_type_to_ase(col, global_config.get("dato_ud", "%Y-%m-%d")) + not_null = "NOT NULL" if col.get("påkrævet") else "NULL" + col_lines.append(f' "{name}" {sql_type} {not_null}') + + # PRIMARY KEY + pk_line = "" + if pk_cols: + cols_list = ", ".join(f'"{c}"' for c in pk_cols) + pk_line = f",\n PRIMARY KEY ({cols_list})" + + cols_block = ",\n".join(col_lines) + pk_line + table_only = table.split(".")[-1] + schema = table.split(".")[0] if "." in table else "dbo" + + drop_part = ( + f"-- Omdøb eksisterende tabel inden oprettelse af ny:\n" + f"IF EXISTS (SELECT 1 FROM sysobjects WHERE name = '{table_only}' AND type = 'U')\n" + f"BEGIN\n" + f" EXEC sp_rename '{table}', '{table_only}_gammel'\n" + f"END\n" + f"GO\n\n" + f"-- DROP gammel tabel når du er sikker:\n" + f"-- IF EXISTS (SELECT 1 FROM sysobjects WHERE name = '{table_only}_gammel' AND type = 'U')\n" + f"-- BEGIN\n" + f"-- DROP TABLE {schema}.{table_only}_gammel\n" + f"-- END\n" + f"-- GO\n\n" + ) + create_part = ( + f"CREATE TABLE {table} (\n" + f"{cols_block}\n" + f");\nGO\n" + ) + return drop_part + create_part + +def generate_indexes_sql(file_conf: dict) -> str: + """ + Genererer CREATE INDEX statements for den blivende tabel. + + Automatiske indexes: + - Ét index på forretnings_nøgler (hvis angivet) + - Ét index på forretnings_nøgler + virk_fra (hvis historik og virk_fra angivet) + + Eksplicitte indexes: + - Fra tabel.indexes – liste af kolonnenavne-lister + """ + tabel_cfg = file_conf.get("tabel", {}) + table = tabel_cfg.get("blivende") + if not table: + return "" + + table_only = table.split(".")[-1] + forretnings_nøgler = tabel_cfg.get("forretnings_nøgler", []) + historik = tabel_cfg.get("historik") + virk_fra = tabel_cfg.get("virk_fra") + eksplicitte = tabel_cfg.get("indexes", []) + + lines = [] + ix_nr = 1 + + # Automatisk index på forretnings_nøgler + if forretnings_nøgler: + cols = ", ".join(f'"{c}"' for c in forretnings_nøgler) + ix_navn = f"ix_{table_only}_bk" + lines.append(f"CREATE INDEX {ix_navn} ON {table} ({cols})") + lines.append("GO\n") + ix_nr += 1 + + # Automatisk historik-index på forretnings_nøgler + virk_fra + if forretnings_nøgler and historik in ("t1", "t2") and virk_fra: + cols = ", ".join(f'"{c}"' for c in forretnings_nøgler) + f', "{virk_fra}"' + ix_navn = f"ix_{table_only}_bk_virk" + lines.append(f"CREATE INDEX {ix_navn} ON {table} ({cols})") + lines.append("GO\n") + ix_nr += 1 + + # Eksplicitte indexes fra YAML + for ix_cols in eksplicitte: + if isinstance(ix_cols, list): + cols = ", ".join(f'"{c}"' for c in ix_cols) + ix_navn = f"ix_{table_only}_{ix_nr:02d}" + lines.append(f"CREATE INDEX {ix_navn} ON {table} ({cols})") + lines.append("GO\n") + ix_nr += 1 + + return "\n".join(lines) + +def generate_t1_flyt_sql(file_conf: dict) -> str: + """ + Genererer T1 (overskriv) flyt-script. + DELETE på forretnings_nøgler + INSERT af staging-kolonner. + """ + tabel_cfg = file_conf.get("tabel", {}) + staging = tabel_cfg.get("staging") + blivende = tabel_cfg.get("blivende") + forretnings_nøgler = tabel_cfg.get("forretnings_nøgler", []) + staging_kolonner = [k["navn"] for k in _byg_staging_kolonner(file_conf)] + + if not staging or not blivende: + raise ValueError("T1 flyt kræver både 'tabel.staging' og 'tabel.blivende'.") + + cols_block = ",\n ".join(staging_kolonner) + + lines = [] + lines.append(f"-- T1: Overskriv eksisterende rækker baseret på forretningsnøgle") + lines.append(f"-- Trin 1: Slet eksisterende rækker der findes i staging") + lines.append(f"DELETE FROM {blivende}") + lines.append(f"FROM {blivende}") + lines.append(f"JOIN {staging}") + + if forretnings_nøgler: + join_betingelser = [ + f" {blivende}.{k} = {staging}.{k}" + for k in forretnings_nøgler + ] + lines.append(f" ON " + "\n AND ".join(join_betingelser)) + else: + lines.append(f" ON 1 = 1 -- TILPAS: angiv forretnings_nøgler i YAML") + + lines.append("GO\n") + lines.append(f"-- Trin 2: Indsæt nye rækker fra staging") + lines.append(f"INSERT INTO {blivende} (") + lines.append(f" {cols_block}") + lines.append(f")") + lines.append(f"SELECT") + lines.append(f" {cols_block}") + lines.append(f"FROM {staging}") + lines.append("GO\n") + + return "\n".join(lines) + + +def generate_t2_flyt_sql(file_conf: dict) -> str: + """ + Genererer T2 (historik) flyt-script. + Luk eksisterende rækker med UPDATE på virk_til + INSERT nye rækker. + Forudsætter at ny data altid er nyere end eksisterende. + """ + tabel_cfg = file_conf.get("tabel", {}) + staging = tabel_cfg.get("staging") + blivende = tabel_cfg.get("blivende") + forretnings_nøgler = tabel_cfg.get("forretnings_nøgler", []) + virk_fra = tabel_cfg.get("virk_fra") + virk_til = tabel_cfg.get("virk_til") + staging_kolonner = [k["navn"] for k in _byg_staging_kolonner(file_conf)] + + if not staging or not blivende: + raise ValueError("T2 flyt kræver både 'tabel.staging' og 'tabel.blivende'.") + if not virk_fra or not virk_til: + raise ValueError("T2 flyt kræver 'tabel.virk_fra' og 'tabel.virk_til'.") + + cols_block = ",\n ".join(staging_kolonner) + + lines = [] + lines.append(f"-- T2: Historik – luk eksisterende rækker og indsæt nye") + lines.append(f"-- Forudsætning: ny data er altid nyere end eksisterende rækker") + lines.append(f"--") + lines.append(f"-- Trin 1: Luk eksisterende åbne rækker ved at sætte {virk_til}") + lines.append(f"UPDATE {blivende}") + lines.append(f"SET {blivende}.{virk_til} = {staging}.{virk_fra}") + lines.append(f"FROM {blivende}") + lines.append(f"JOIN {staging}") + + if forretnings_nøgler: + join_betingelser = [ + f" {blivende}.{k} = {staging}.{k}" + for k in forretnings_nøgler + ] + lines.append(f" ON " + "\n AND ".join(join_betingelser)) + lines.append(f" AND {blivende}.{virk_til} = '9999-12-31' -- kun åbne rækker") + else: + lines.append(f" ON 1 = 1 -- TILPAS: angiv forretnings_nøgler i YAML") + + lines.append("GO\n") + lines.append(f"-- Trin 2: Indsæt nye rækker fra staging") + lines.append(f"-- virk_til sættes til '9999-12-31' (åben række)") + lines.append(f"INSERT INTO {blivende} (") + lines.append(f" {cols_block},") + lines.append(f" {virk_til}") + lines.append(f")") + lines.append(f"SELECT") + lines.append(f" {cols_block},") + lines.append(f" '9999-12-31'") + lines.append(f"FROM {staging}") + lines.append("GO\n") + + return "\n".join(lines) \ No newline at end of file diff --git a/xsd_til_yaml.py b/xsd_til_yaml.py new file mode 100644 index 0000000..e69de29 diff --git a/xsd_til_yaml_old.py b/xsd_til_yaml_old.py new file mode 100644 index 0000000..406fc37 --- /dev/null +++ b/xsd_til_yaml_old.py @@ -0,0 +1,471 @@ +#!/usr/bin/env python3 +""" +xsd_til_yaml.py + +Genererer YAML-skelet fra en XSD-fil til brug med udpak_semistruktur. + +Producerer: + - skabeloner.yaml – feltskabeloner for alle complexTypes + - nøgler.yaml – nøgle-placeholders for overliggende unbounded-niveauer + - udtræk_.yaml – én fil per maxOccurs="unbounded" element + +Kør med: + python3 xsd_til_yaml.py --xsd bankdata.xsd --output ./yaml_output/ +""" + +from __future__ import annotations +import os +import argparse +from xml.etree import ElementTree as ET +from dataclasses import dataclass, field +from typing import Optional + +# XML Schema namespace +XS = "http://www.w3.org/2001/XMLSchema" + + +# ============================================================ +# Datastrukturer +# ============================================================ + +@dataclass +class XsdFelt: + """Repræsenterer ét felt (attribut eller element) i en complexType.""" + navn: str + er_attribut: bool + xsd_type: str = "xs:string" + påkrævet: bool = False + + @property + def felt_ref(self) -> str: + """Returnerer felt-referencen som den skrives i YAML.""" + return f"@{self.navn}" if self.er_attribut else self.navn + + +@dataclass +class XsdKompleksType: + """Repræsenterer en complexType fra XSD.""" + navn: str + felter: list[XsdFelt] = field(default_factory=list) + + +@dataclass +class XsdUdboundElement: + """Repræsenterer et maxOccurs=unbounded element – bliver til en output-fil.""" + element_navn: str # fx "postering" + type_navn: str # fx "PosteringType" + rod_sti: str # fx "bank.kunder.kunde.konti.konto.posteringer.postering" + overliggende: list[str] # navne på overliggende unbounded elementer + # fx ["kunde", "konto"] + + +# ============================================================ +# XSD Parser +# ============================================================ + +class XsdParser: + """Parser en XSD-fil og udtrækker complexTypes og unbounded elementer.""" + + def __init__(self, xsd_sti: str): + self.xsd_sti = xsd_sti + self.tree = ET.parse(xsd_sti) + self.root = self.tree.getroot() + + # Navngivne complexTypes fra XSD + self.komplekse_typer: dict[str, XsdKompleksType] = {} + + # Unbounded elementer med rod-sti og overliggende niveauer + self.unbounded_elementer: list[XsdUdboundElement] = [] + + def parse(self) -> None: + """Kør den komplette parsing.""" + self._parse_navngivne_typer() + self._find_unbounded_elementer() + + def _xs(self, tag: str) -> str: + """Returnerer fuldt kvalificeret XS-tag.""" + return f"{{{XS}}}{tag}" + + def _parse_navngivne_typer(self) -> None: + """Find alle navngivne complexTypes på topniveau.""" + for ct in self.root.findall(self._xs("complexType")): + navn = ct.get("name") + if not navn: + continue + felter = self._udpak_felter(ct) + self.komplekse_typer[navn] = XsdKompleksType(navn=navn, felter=felter) + + def _udpak_felter(self, ct_node) -> list[XsdFelt]: + """Udpakker alle felter (attributter + simple elementer) fra en complexType.""" + felter = [] + + # Attributter – bliver til @navn i YAML + for attr in ct_node.findall(self._xs("attribute")): + navn = attr.get("name") + xsd_type = attr.get("type", "xs:string") + påkrævet = attr.get("use") == "required" + felter.append(XsdFelt( + navn=navn, + er_attribut=True, + xsd_type=xsd_type, + påkrævet=påkrævet + )) + + # Sequence-elementer + for seq in ct_node.findall(self._xs("sequence")): + for el in seq.findall(self._xs("element")): + el_type = el.get("type", "xs:string") + el_navn = el.get("name") + max_occ = el.get("maxOccurs", "1") + + # Spring komplekse typer og unbounded over – de håndteres separat + if max_occ == "unbounded": + continue + if el_type and not el_type.startswith("xs:"): + continue + + felter.append(XsdFelt( + navn=el_navn, + er_attribut=False, + xsd_type=el_type or "xs:string", + påkrævet=True + )) + + return felter + + def _find_unbounded_elementer(self) -> None: + """ + Gennemgår XSD rekursivt og finder alle maxOccurs=unbounded elementer. + Bygger rod-stier og registrerer overliggende unbounded niveauer. + """ + # Find rod-elementet + rod_el = self.root.find(self._xs("element")) + if rod_el is None: + return + + rod_navn = rod_el.get("name", "rod") + rod_type = rod_el.get("type") + + self._traverse( + node=rod_el, + sti=[rod_navn], + overliggende_unbounded=[], + rod_type=rod_type + ) + + def _hent_type_node(self, type_navn: str): + """Henter en navngiven complexType-node fra XSD.""" + for ct in self.root.findall(self._xs("complexType")): + if ct.get("name") == type_navn: + return ct + return None + + def _traverse( + self, + node, + sti: list[str], + overliggende_unbounded: list[str], + rod_type: Optional[str] = None + ) -> None: + """ + Rekursiv gennemgang af XSD-strukturen. + Finder unbounded elementer og bygger rod-stier. + """ + # Find complexType-noden at arbejde med + if rod_type: + ct_node = self._hent_type_node(rod_type) + else: + ct_node = node.find(self._xs("complexType")) + + if ct_node is None: + return + + # Gennemgå sequence-elementer + for seq in ct_node.findall(self._xs("sequence")): + for el in seq.findall(self._xs("element")): + el_navn = el.get("name") + el_type = el.get("type") + max_occ = el.get("maxOccurs", "1") + + ny_sti = sti + [el_navn] + + if max_occ == "unbounded": + # Dette element bliver en output-fil + rod_sti = ".".join(ny_sti) + self.unbounded_elementer.append(XsdUdboundElement( + element_navn=el_navn, + type_navn=el_type or "", + rod_sti=rod_sti, + overliggende=list(overliggende_unbounded) + )) + # Fortsæt rekursivt med dette som nyt overliggende niveau + self._traverse( + node=el, + sti=ny_sti, + overliggende_unbounded=overliggende_unbounded + [el_navn], + rod_type=el_type + ) + else: + # Ikke unbounded – fortsæt ned hvis det er en kompleks type + if el_type and not el_type.startswith("xs:"): + self._traverse( + node=el, + sti=ny_sti, + overliggende_unbounded=overliggende_unbounded, + rod_type=el_type + ) + + +# ============================================================ +# Hjælpefunktion: XSD type → YAML type +# ============================================================ + +def xsd_type_til_yaml(xsd_type: str) -> dict: + """Konverterer en XSD-type til YAML kolonne-attributter.""" + mapping = { + "xs:string": {}, + "xs:decimal": {"type": "decimal", "decimaler": 2}, + "xs:integer": {"type": "integer"}, + "xs:int": {"type": "integer"}, + "xs:boolean": {"type": "boolean"}, + "xs:date": {"type": "date", "dato_ind": "%Y-%m-%d", "dato_ud": "%d-%m-%Y"}, + "xs:dateTime": {"type": "date", "dato_ind": "%Y-%m-%dT%H:%M:%S", "dato_ud": "SYBASE"}, + } + return mapping.get(xsd_type, {}) + + +# ============================================================ +# CLI +# ============================================================ + +def byg_argument_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Genererer YAML-skelet fra XSD til udpak_semistruktur." + ) + parser.add_argument("--xsd", required=True, help="Sti til XSD-filen") + parser.add_argument("--output", required=True, help="Output-mappe til YAML-filer") + parser.add_argument("--prefix", default="udtræk", + help="Prefix til udtræk-filnavne (standard: udtræk)") + return parser + + +# ============================================================ +# YAML Generatorer +# ============================================================ + +def _yaml_type_linjer(xsd_type: str, indryk: str) -> list[str]: + """Returnerer YAML-linjer for typekonvertering baseret på XSD-type.""" + attrs = xsd_type_til_yaml(xsd_type) + linjer = [] + for k, v in attrs.items(): + if isinstance(v, str): + linjer.append(f'{indryk}{k}: "{v}"') + else: + linjer.append(f'{indryk}{k}: {v}') + return linjer + + +def generer_skabeloner_yaml(xsd: XsdParser, output_mappe: str) -> None: + """Genererer skabeloner.yaml med feltskabeloner for alle relevante complexTypes.""" + linjer = [] + linjer.append("# =============================================================================") + linjer.append("# skabeloner.yaml") + linjer.append("#") + linjer.append("# Auto-genereret feltskabeloner fra XSD.") + linjer.append("# Relative feltnavne – brug prefix_felt naar skabelonen anvendes.") + linjer.append("# =============================================================================") + linjer.append("") + linjer.append("kolonne_skabeloner:") + linjer.append("") + + relevante = {navn: kt for navn, kt in xsd.komplekse_typer.items() if kt.felter} + + for type_navn, kt in relevante.items(): + skabelon_navn = type_navn.replace("Type", "").lower() + linjer.append(f" # Felter fra {type_navn}") + linjer.append(f" {skabelon_navn}:") + + for felt in kt.felter: + linjer.append(f" - navn: {felt.navn}") + linjer.append(f' felt: "{felt.felt_ref}"') + for type_linje in _yaml_type_linjer(felt.xsd_type, " "): + linjer.append(type_linje) + if felt.påkrævet: + linjer.append(f" påkrævet: true") + linjer.append("") + + sti = os.path.join(output_mappe, "skabeloner.yaml") + with open(sti, "w", encoding="utf-8") as f: + f.write("\n".join(linjer)) + print(f" Skrev: {sti}") + + +def generer_nøgler_yaml(xsd: XsdParser, output_mappe: str) -> None: + """Genererer nøgler.yaml med placeholders for forretningsnøgler.""" + overliggende_niveauer = set() + for el in xsd.unbounded_elementer: + for ov in el.overliggende: + overliggende_niveauer.add(ov) + + if not overliggende_niveauer: + print(" Ingen overliggende niveauer – nøgler.yaml ikke genereret.") + return + + linjer = [] + linjer.append("# =============================================================================") + linjer.append("# nøgler.yaml") + linjer.append("#") + linjer.append("# Nøgle-skabeloner der binder output-filer til overliggende niveauer.") + linjer.append("# UDFYLD: Erstat __FELT__ med den korrekte forretningsnøgle.") + linjer.append("# =============================================================================") + linjer.append("") + linjer.append("kolonne_skabeloner:") + linjer.append("") + + skrevne = set() + for el in xsd.unbounded_elementer: + if not el.overliggende: + continue + rod_dele = el.rod_sti.split(".") + for ov_navn in el.overliggende: + skabelon_navn = f"{ov_navn}_nøgle" + if skabelon_navn in skrevne: + continue + skrevne.add(skabelon_navn) + try: + idx = rod_dele.index(ov_navn) + ov_rod_sti = ".".join(rod_dele[:idx + 1]) + except ValueError: + ov_rod_sti = f"__ROD_STI_TIL_{ov_navn.upper()}__" + + linjer.append(f" # Nøgle fra {ov_navn}-niveau") + linjer.append(f" # TODO: Angiv den korrekte forretningsnøgle for {ov_navn}") + linjer.append(f" {skabelon_navn}:") + linjer.append(f" - navn: __FELT__") + linjer.append(" felt: __FELT__ # fx '@id' eller 'nr'") + linjer.append(f" rod: {ov_rod_sti}") + linjer.append("") + + sti = os.path.join(output_mappe, "nøgler.yaml") + with open(sti, "w", encoding="utf-8") as f: + f.write("\n".join(linjer)) + print(f" Skrev: {sti}") + + +def generer_udtræk_yaml(xsd: XsdParser, output_mappe: str, prefix: str) -> None: + """Genererer én udtræk-yaml per unbounded element.""" + for el in xsd.unbounded_elementer: + linjer = [] + linjer.append("# =============================================================================") + linjer.append(f"# {prefix}_{el.element_navn}.yaml") + linjer.append("#") + linjer.append(f"# Auto-genereret udtræk for {el.element_navn}.") + if el.overliggende: + linjer.append(f"# Udfyld nøgle-skabeloner i nøgler.yaml inden brug.") + linjer.append("# =============================================================================") + linjer.append("") + linjer.append("output_filer:") + linjer.append("") + linjer.append(f" - rod: {el.rod_sti}") + linjer.append(f" kolonner:") + + if el.overliggende: + linjer.append(f" # Nøgler fra overliggende niveauer – udfyld i nøgler.yaml") + for ov_navn in el.overliggende: + linjer.append(f" - skabelon: {ov_navn}_nøgle") + linjer.append("") + + if el.type_navn and el.type_navn in xsd.komplekse_typer: + kt = xsd.komplekse_typer[el.type_navn] + if kt.felter: + skabelon_navn = el.type_navn.replace("Type", "").lower() + linjer.append(f" # Felter fra {el.element_navn} – se skabeloner.yaml") + linjer.append(f" - skabelon: {skabelon_navn}") + + linjer.append("") + linjer.append(f" outputs:") + linjer.append(f" - fil_navn: \"{el.element_navn}_{{yyyy}}{{mm}}{{dd}}.txt\"") + linjer.append(f" overskrifter: true") + linjer.append("") + + fil_navn = f"{prefix}_{el.element_navn}.yaml" + sti = os.path.join(output_mappe, fil_navn) + with open(sti, "w", encoding="utf-8") as f: + f.write("\n".join(linjer)) + print(f" Skrev: {sti}") + + +def generer_hoved_config(xsd: XsdParser, output_mappe: str, prefix: str) -> None: + """Genererer en hoved-config der inkluderer alle genererede filer.""" + xsd_basis = os.path.splitext(os.path.basename(xsd.xsd_sti))[0] + linjer = [] + linjer.append("# =============================================================================") + linjer.append(f"# config_{xsd_basis}.yaml") + linjer.append("#") + linjer.append("# Auto-genereret hoved-config. Tilpas config-sektionen.") + linjer.append("# =============================================================================") + linjer.append("") + linjer.append("include:") + linjer.append(" - skabeloner.yaml") + linjer.append(" - nøgler.yaml") + for el in xsd.unbounded_elementer: + linjer.append(f" - {prefix}_{el.element_navn}.yaml") + linjer.append("") + linjer.append("config:") + linjer.append(f" input_fil: {xsd_basis}.xml") + linjer.append(" output_path: __OUTPUT_PATH__") + linjer.append(" logfil: log_{yyyy}{mm}{dd}.txt") + linjer.append(" log_niveau: info") + linjer.append(" log_output: begge") + linjer.append(" encoding: utf-8") + linjer.append(" separator: \"\\t\"") + linjer.append(" skrivetilstand: w") + linjer.append(" dato_ind: \"%Y-%m-%d\"") + linjer.append(" dato_ud: \"%d-%m-%Y\"") + + fil_navn = f"config_{xsd_basis}.yaml" + sti = os.path.join(output_mappe, fil_navn) + with open(sti, "w", encoding="utf-8") as f: + f.write("\n".join(linjer)) + print(f" Skrev: {sti}") + + +def main(): + parser = byg_argument_parser() + args = parser.parse_args() + + if not os.path.exists(args.xsd): + print(f"FEJL: XSD-filen '{args.xsd}' findes ikke.") + return + + os.makedirs(args.output, exist_ok=True) + + print(f"Parser XSD: {args.xsd}") + xsd = XsdParser(args.xsd) + xsd.parse() + + print(f"\nFandt {len(xsd.komplekse_typer)} komplekse typer:") + for navn in xsd.komplekse_typer: + kt = xsd.komplekse_typer[navn] + print(f" {navn}: {len(kt.felter)} felter") + + print(f"\nFandt {len(xsd.unbounded_elementer)} unbounded elementer:") + for el in xsd.unbounded_elementer: + print(f" {el.element_navn} ({el.rod_sti})") + if el.overliggende: + print(f" Overliggende: {el.overliggende}") + + print(f"\nGenererer YAML-filer i: {args.output}") + generer_skabeloner_yaml(xsd, args.output) + generer_nøgler_yaml(xsd, args.output) + generer_udtræk_yaml(xsd, args.output, args.prefix) + generer_hoved_config(xsd, args.output, args.prefix) + + print("\nFærdig! Husk at:") + print(" 1. Udfyld nøgler.yaml med de korrekte forretningsnøgler") + print(" 2. Ret output_path i hoved-config") + print(" 3. Tjek at rod-stier passer til din XML-struktur") + + +if __name__ == "__main__": + main()