""" local_db.py — Lokal SQLite database til offline brug. Håndterer: - Musikbiblioteker (stier der overvåges) - Sange høstet fra filsystemet - Lokale afspilningslister (offline-projekter) - Synkroniseringsstatus mod API """ import sqlite3 import threading from contextlib import contextmanager from datetime import datetime, timezone from pathlib import Path DB_PATH = Path.home() / ".linedance" / "local.db" _local = threading.local() _global_conn: sqlite3.Connection | None = None def _get_conn() -> sqlite3.Connection: """Returnerer en global forbindelse i autocommit mode.""" global _global_conn if _global_conn is None: DB_PATH.parent.mkdir(parents=True, exist_ok=True) _global_conn = sqlite3.connect(str(DB_PATH), check_same_thread=False, isolation_level=None) # autocommit _global_conn.row_factory = sqlite3.Row _global_conn.execute("PRAGMA journal_mode=WAL") _global_conn.execute("PRAGMA foreign_keys=ON") return _global_conn def new_conn() -> sqlite3.Connection: """Åbn en frisk forbindelse til brug i tag_editor og dialogs.""" conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys=OFF") # FK checker forhindrer level_id gem return conn @contextmanager def get_db(): """Context manager der bruger app-forbindelsen i autocommit mode. Hver statement committer med det samme — ingen eksplicit transaktion.""" conn = _get_conn() try: yield conn except Exception: raise def get_db_raw() -> sqlite3.Connection: return _get_conn() def init_db(): """Opret alle tabeller hvis de ikke findes.""" conn = _get_conn() # executescript committer automatisk og nulstiller isolation_level # Kør det direkte på den underliggende connection conn.executescript(""" CREATE TABLE IF NOT EXISTS libraries ( id INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT NOT NULL UNIQUE, is_active INTEGER NOT NULL DEFAULT 1, last_full_scan TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS songs ( id TEXT PRIMARY KEY, library_id INTEGER REFERENCES libraries(id), local_path TEXT NOT NULL UNIQUE, title TEXT NOT NULL DEFAULT '', artist TEXT NOT NULL DEFAULT '', album TEXT NOT NULL DEFAULT '', bpm INTEGER NOT NULL DEFAULT 0, duration_sec INTEGER NOT NULL DEFAULT 0, file_format TEXT NOT NULL DEFAULT '', file_modified_at TEXT NOT NULL, file_missing INTEGER NOT NULL DEFAULT 0, extra_tags TEXT NOT NULL DEFAULT '{}', api_song_id TEXT, last_synced_at TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS dance_levels ( id INTEGER PRIMARY KEY AUTOINCREMENT, sort_order INTEGER NOT NULL, name TEXT NOT NULL UNIQUE, description TEXT NOT NULL DEFAULT '', synced_at TEXT ); -- Dans-entitet: navn + niveau er unik kombination CREATE TABLE IF NOT EXISTS dances ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL COLLATE NOCASE, level_id INTEGER REFERENCES dance_levels(id), use_count INTEGER NOT NULL DEFAULT 1, source TEXT NOT NULL DEFAULT 'local', synced_at TEXT, UNIQUE(name, level_id) ); -- Hoveddanse på en sang CREATE TABLE IF NOT EXISTS song_dances ( id INTEGER PRIMARY KEY AUTOINCREMENT, song_id TEXT NOT NULL REFERENCES songs(id) ON DELETE CASCADE, dance_id INTEGER NOT NULL REFERENCES dances(id), dance_order INTEGER NOT NULL DEFAULT 1, UNIQUE(song_id, dance_id) ); -- Alternativ-danse på en sang CREATE TABLE IF NOT EXISTS song_alt_dances ( id INTEGER PRIMARY KEY AUTOINCREMENT, song_id TEXT NOT NULL REFERENCES songs(id) ON DELETE CASCADE, dance_id INTEGER NOT NULL REFERENCES dances(id), note TEXT NOT NULL DEFAULT '', source TEXT NOT NULL DEFAULT 'local', created_at TEXT NOT NULL DEFAULT (datetime('now')), UNIQUE(song_id, dance_id) ); CREATE TABLE IF NOT EXISTS playlists ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, description TEXT NOT NULL DEFAULT '', api_project_id TEXT, last_synced_at TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS playlist_songs ( id INTEGER PRIMARY KEY AUTOINCREMENT, playlist_id INTEGER NOT NULL REFERENCES playlists(id) ON DELETE CASCADE, song_id TEXT NOT NULL REFERENCES songs(id), position INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'pending', UNIQUE(playlist_id, position) ); CREATE TABLE IF NOT EXISTS sync_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, entity_type TEXT NOT NULL, entity_id TEXT NOT NULL, action TEXT NOT NULL, payload TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS event_state ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_songs_title ON songs(title); CREATE INDEX IF NOT EXISTS idx_songs_artist ON songs(artist); CREATE INDEX IF NOT EXISTS idx_songs_missing ON songs(file_missing); CREATE INDEX IF NOT EXISTS idx_songs_library ON songs(library_id); CREATE INDEX IF NOT EXISTS idx_song_dances ON song_dances(song_id); CREATE INDEX IF NOT EXISTS idx_song_alt_dances ON song_alt_dances(song_id); CREATE INDEX IF NOT EXISTS idx_dances_name ON dances(name); """) # executescript slår foreign_keys fra — genaktiver conn.execute("PRAGMA foreign_keys=ON") # Tilføj db_version tabel hvis den ikke findes conn.execute(""" CREATE TABLE IF NOT EXISTS db_version ( version INTEGER PRIMARY KEY ) """) # Kør versionsbaserede migrationer _run_versioned_migrations(conn) # Seed standard-niveauer count = conn.execute("SELECT COUNT(*) FROM dance_levels").fetchone()[0] if count == 0: defaults = [ (1, "Begynder", "Passer til alle"), (2, "Let øvet", "Lidt erfaring kræves"), (3, "Øvet", "Kræver regelmæssig træning"), (4, "Erfaren", "For dedikerede dansere"), (5, "Ekspert", "Konkurrenceniveau"), ] for row in defaults: conn.execute( "INSERT OR IGNORE INTO dance_levels (sort_order, name, description) VALUES (?,?,?)", row ) # ── Versionsbaserede migrationer ────────────────────────────────────────────── # Tilføj aldrig gamle — tilføj kun nye versioner nederst. MIGRATIONS: dict[int, list[str]] = { 1: [ "ALTER TABLE songs ADD COLUMN extra_tags TEXT NOT NULL DEFAULT '{}'", ], 2: [ # Ny dans-entitet model """CREATE TABLE IF NOT EXISTS dances ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL COLLATE NOCASE, level_id INTEGER REFERENCES dance_levels(id), use_count INTEGER NOT NULL DEFAULT 1, source TEXT NOT NULL DEFAULT 'local', synced_at TEXT, UNIQUE(name, level_id) )""", """CREATE TABLE IF NOT EXISTS song_alt_dances ( id INTEGER PRIMARY KEY AUTOINCREMENT, song_id TEXT NOT NULL REFERENCES songs(id) ON DELETE CASCADE, dance_id INTEGER NOT NULL REFERENCES dances(id), note TEXT NOT NULL DEFAULT '', source TEXT NOT NULL DEFAULT 'local', created_at TEXT NOT NULL DEFAULT (datetime('now')), UNIQUE(song_id, dance_id) )""", # Migrer eksisterende song_dances data til ny model # (kører kun på ældre databaser der har dance_name kolonnen) """INSERT OR IGNORE INTO dances (name, level_id, source) SELECT DISTINCT dance_name, level_id, 'local' FROM song_dances WHERE dance_name IS NOT NULL AND dance_name != ''""", ], 3: [ "ALTER TABLE playlists ADD COLUMN tags TEXT NOT NULL DEFAULT ''", ], 4: [ "ALTER TABLE dances ADD COLUMN choreographer TEXT NOT NULL DEFAULT ''", "ALTER TABLE dances ADD COLUMN video_url TEXT NOT NULL DEFAULT ''", "ALTER TABLE dances ADD COLUMN stepsheet_url TEXT NOT NULL DEFAULT ''", "ALTER TABLE dances ADD COLUMN notes TEXT NOT NULL DEFAULT ''", ], 5: [ # Workshop-markering på sang+dans kombination (ikke dans alene) """ALTER TABLE song_dances ADD COLUMN is_workshop INTEGER NOT NULL DEFAULT 0""", """ALTER TABLE song_alt_dances ADD COLUMN is_workshop INTEGER NOT NULL DEFAULT 0""", ], 6: [ # Workshop og dans-valg på selve playlist-sangen """ALTER TABLE playlist_songs ADD COLUMN is_workshop INTEGER NOT NULL DEFAULT 0""", """ALTER TABLE playlist_songs ADD COLUMN dance_override TEXT NOT NULL DEFAULT ''""", ], 7: [ # Linkede server-playlister """ALTER TABLE playlists ADD COLUMN is_linked INTEGER NOT NULL DEFAULT 0""", """ALTER TABLE playlists ADD COLUMN server_permission TEXT NOT NULL DEFAULT 'view'""", ], } def _run_versioned_migrations(conn): """Kør kun migrationer der ikke allerede er kørt vha. db_version tabel.""" row = conn.execute("SELECT version FROM db_version").fetchone() current_version = row["version"] if row else 0 for version in sorted(MIGRATIONS.keys()): if version <= current_version: continue for sql in MIGRATIONS[version]: try: conn.execute(sql) except Exception: pass # kolonnen eksisterer allerede conn.execute( "INSERT OR REPLACE INTO db_version (version) VALUES (?)", (version,) ) # ── Biblioteker ─────────────────────────────────────────────────────────────── def add_library(path: str) -> int: with get_db() as conn: cur = conn.execute( "INSERT OR IGNORE INTO libraries (path) VALUES (?)", (path,) ) if cur.lastrowid: return cur.lastrowid row = conn.execute("SELECT id FROM libraries WHERE path=?", (path,)).fetchone() return row["id"] def get_libraries(active_only: bool = True) -> list[sqlite3.Row]: with get_db() as conn: if active_only: return conn.execute( "SELECT * FROM libraries WHERE is_active=1 ORDER BY path" ).fetchall() return conn.execute("SELECT * FROM libraries ORDER BY path").fetchall() def remove_library(library_id: int): with get_db() as conn: # Marker sange som manglende og løsriv dem fra biblioteket conn.execute( "UPDATE songs SET file_missing=1, library_id=NULL WHERE library_id=?", (library_id,) ) # Nu kan biblioteket slettes uden FK-konflikt conn.execute("DELETE FROM libraries WHERE id=?", (library_id,)) def update_library_scan_time(library_id: int): now = datetime.now(timezone.utc).isoformat() with get_db() as conn: conn.execute( "UPDATE libraries SET last_full_scan=? WHERE id=?", (now, library_id) ) # ── Sange ───────────────────────────────────────────────────────────────────── def upsert_song(song_data: dict) -> str: """ Indsæt eller opdater en sang baseret på local_path. Returnerer song_id. """ import uuid, json with get_db() as conn: existing = conn.execute( "SELECT id FROM songs WHERE local_path=?", (song_data["local_path"],) ).fetchone() extra_tags_json = json.dumps(song_data.get("extra_tags", {}), ensure_ascii=False) if existing: song_id = existing["id"] conn.execute(""" UPDATE songs SET library_id=?, title=?, artist=?, album=?, bpm=?, duration_sec=?, file_format=?, file_modified_at=?, file_missing=0, extra_tags=? WHERE id=? """, ( song_data.get("library_id"), song_data.get("title", ""), song_data.get("artist", ""), song_data.get("album", ""), song_data.get("bpm", 0), song_data.get("duration_sec", 0), song_data.get("file_format", ""), song_data.get("file_modified_at", ""), extra_tags_json, song_id, )) else: song_id = str(uuid.uuid4()) conn.execute(""" INSERT INTO songs (id, library_id, local_path, title, artist, album, bpm, duration_sec, file_format, file_modified_at, extra_tags) VALUES (?,?,?,?,?,?,?,?,?,?,?) """, ( song_id, song_data.get("library_id"), song_data["local_path"], song_data.get("title", ""), song_data.get("artist", ""), song_data.get("album", ""), song_data.get("bpm", 0), song_data.get("duration_sec", 0), song_data.get("file_format", ""), song_data.get("file_modified_at", ""), extra_tags_json, )) # Opdater danse hvis de er med i data — bevar eksisterende og merge if "dances" in song_data: file_dances = [] for dance in song_data["dances"]: name = dance.get("name", dance) if isinstance(dance, dict) else dance if name: file_dances.append(name.strip()) # Find eksisterende song_dances via dances tabel existing = conn.execute(""" SELECT sd.id, d.name, sd.dance_order, d.level_id, d.id as dance_id FROM song_dances sd JOIN dances d ON d.id = sd.dance_id WHERE sd.song_id=? ORDER BY sd.dance_order """, (song_id,)).fetchall() existing_map = {r["name"].lower(): r for r in existing} file_lower = [d.lower() for d in file_dances] # Slet danse der ikke længere er i filen for row in existing: if row["name"].lower() not in file_lower: conn.execute("DELETE FROM song_dances WHERE id=?", (row["id"],)) # Tilføj eller opdater danse fra filen for i, name in enumerate(file_dances, start=1): ex = existing_map.get(name.lower()) if ex: conn.execute( "UPDATE song_dances SET dance_order=? WHERE id=?", (i, ex["id"]) ) else: # Opret eller find dans (name + NULL level = ny dans uden niveau) dance_id = get_or_create_dance(name, None, conn) conn.execute( "INSERT OR IGNORE INTO song_dances (song_id, dance_id, dance_order) " "VALUES (?,?,?)", (song_id, dance_id, i) ) return song_id def mark_song_missing(local_path: str): with get_db() as conn: conn.execute( "UPDATE songs SET file_missing=1 WHERE local_path=?", (local_path,) ) def get_song_by_path(local_path: str) -> sqlite3.Row | None: with get_db() as conn: return conn.execute( "SELECT * FROM songs WHERE local_path=?", (local_path,) ).fetchone() def search_songs(query: str, limit: int = 50) -> list[sqlite3.Row]: """Søg i alle tags — titel, artist, album, danse og alle øvrige tags.""" pattern = f"%{query}%" with get_db() as conn: return conn.execute(""" SELECT DISTINCT s.* FROM songs s LEFT JOIN song_dances sd ON sd.song_id = s.id LEFT JOIN dances d ON d.id = sd.dance_id WHERE s.file_missing = 0 AND ( s.title LIKE ? OR s.artist LIKE ? OR s.album LIKE ? OR d.name LIKE ? OR s.extra_tags LIKE ? ) ORDER BY s.artist, s.title LIMIT ? """, (pattern, pattern, pattern, pattern, pattern, limit)).fetchall() def get_songs_for_library(library_id: int) -> list[sqlite3.Row]: with get_db() as conn: return conn.execute( "SELECT * FROM songs WHERE library_id=? ORDER BY artist, title", (library_id,) ).fetchall() def get_all_song_paths_for_library(library_id: int) -> dict[str, str]: """Returnerer {local_path: file_modified_at} — bruges til fuld scan.""" with get_db() as conn: rows = conn.execute( "SELECT local_path, file_modified_at FROM songs WHERE library_id=?", (library_id,) ).fetchall() return {row["local_path"]: row["file_modified_at"] for row in rows} # ── Afspilningslister ───────────────────────────────────────────────────────── def create_playlist(name: str, description: str = "", tags: str = "") -> int: with get_db() as conn: cur = conn.execute( "INSERT INTO playlists (name, description, tags) VALUES (?,?,?)", (name, description, tags) ) return cur.lastrowid def create_linked_playlist(name: str, api_project_id: str, permission: str = "view", description: str = "", tags: str = "") -> int: """Opret en playliste der er linket til en server-playliste.""" with get_db() as conn: cur = conn.execute( """INSERT INTO playlists (name, description, tags, api_project_id, is_linked, server_permission) VALUES (?,?,?,?,1,?)""", (name, description, tags, api_project_id, permission) ) return cur.lastrowid def update_playlist_tags(playlist_id: int, tags: str): with get_db() as conn: conn.execute( "UPDATE playlists SET tags=? WHERE id=?", (tags, playlist_id) ) def get_all_playlist_tags() -> list[str]: """Returnerer alle unikke tags på tværs af alle playlists, sorteret alfabetisk.""" with get_db() as conn: rows = conn.execute( "SELECT tags FROM playlists WHERE tags != '' AND name != ?", ("__aktiv__",) ).fetchall() tags = set() for row in rows: for tag in row["tags"].split(","): t = tag.strip().lower() if t: tags.add(t) return sorted(tags) def get_playlists(tag_filter: str | None = None) -> list[sqlite3.Row]: """Hent alle navngivne playlists med sang-antal. Filtrer på tag hvis angivet.""" with get_db() as conn: if tag_filter: rows = conn.execute(""" SELECT p.*, COUNT(ps.id) as song_count FROM playlists p LEFT JOIN playlist_songs ps ON ps.playlist_id = p.id WHERE p.name != ? AND ( p.tags LIKE ? OR p.tags LIKE ? OR p.tags LIKE ? OR p.tags = ? ) GROUP BY p.id ORDER BY p.created_at DESC """, ( "__aktiv__", f"{tag_filter},%", f"%, {tag_filter},%", f"%, {tag_filter}", tag_filter, )).fetchall() else: rows = conn.execute(""" SELECT p.*, COUNT(ps.id) as song_count FROM playlists p LEFT JOIN playlist_songs ps ON ps.playlist_id = p.id WHERE p.name != ? GROUP BY p.id ORDER BY p.created_at DESC """, ("__aktiv__",)).fetchall() return rows def add_song_to_playlist(playlist_id: int, song_id: str, position: int | None = None) -> int: with get_db() as conn: if position is None: row = conn.execute( "SELECT MAX(position) as max_pos FROM playlist_songs WHERE playlist_id=?", (playlist_id,) ).fetchone() position = (row["max_pos"] or 0) + 1 cur = conn.execute( "INSERT INTO playlist_songs (playlist_id, song_id, position) VALUES (?,?,?)", (playlist_id, song_id, position) ) return cur.lastrowid def update_playlist_song_status(playlist_song_id: int, status: str): valid = {"pending", "playing", "played", "skipped"} if status not in valid: raise ValueError(f"Ugyldig status: {status}") with get_db() as conn: conn.execute( "UPDATE playlist_songs SET status=? WHERE id=?", (status, playlist_song_id) ) def get_playlist_with_songs(playlist_id: int) -> dict: with get_db() as conn: playlist = conn.execute( "SELECT * FROM playlists WHERE id=?", (playlist_id,) ).fetchone() if not playlist: return {} songs = conn.execute(""" SELECT ps.id as ps_id, ps.position, ps.status, s.*, GROUP_CONCAT(sd.dance_name ORDER BY sd.dance_order) as dances FROM playlist_songs ps JOIN songs s ON s.id = ps.song_id LEFT JOIN song_dances sd ON sd.song_id = s.id WHERE ps.playlist_id = ? GROUP BY ps.id ORDER BY ps.position """, (playlist_id,)).fetchall() return {"playlist": dict(playlist), "songs": [dict(s) for s in songs]} # ── Event-state (gemmes løbende så man kan genstarte efter strømsvigt) ──────── def save_event_state(current_idx: int, statuses: list[str]): """Gem event-fremgang — overskrives ved hver ændring.""" import json with get_db() as conn: conn.execute("INSERT OR REPLACE INTO event_state (key,value) VALUES ('current_idx',?)", (str(current_idx),)) conn.execute("INSERT OR REPLACE INTO event_state (key,value) VALUES ('statuses',?)", (json.dumps(statuses),)) def load_event_state() -> tuple[int, list[str]] | None: """Indlæs gemt event-fremgang. Returnerer None hvis ingen gemt tilstand.""" import json with get_db() as conn: idx_row = conn.execute( "SELECT value FROM event_state WHERE key='current_idx'" ).fetchone() sta_row = conn.execute( "SELECT value FROM event_state WHERE key='statuses'" ).fetchone() if not idx_row or not sta_row: return None return int(idx_row["value"]), json.loads(sta_row["value"]) def clear_event_state(): """Nulstil gemt event-tilstand (bruges ved 'Start event').""" with get_db() as conn: conn.execute("DELETE FROM event_state") # ── Dans-navne ordbog ───────────────────────────────────────────────────────── # ── Dans-entitet funktioner ─────────────────────────────────────────────────── def get_dance(dance_id: int) -> sqlite3.Row | None: with get_db() as conn: return conn.execute( "SELECT * FROM dances WHERE id=?", (dance_id,) ).fetchone() def update_dance_info(dance_id: int, choreographer: str = "", video_url: str = "", stepsheet_url: str = "", notes: str = ""): with get_db() as conn: conn.execute(""" UPDATE dances SET choreographer = ?, video_url = ?, stepsheet_url = ?, notes = ? WHERE id = ? """, (choreographer.strip(), video_url.strip(), stepsheet_url.strip(), notes.strip(), dance_id)) def get_or_create_dance(name: str, level_id: int | None, conn=None) -> int: """Find eller opret en dans (name + level_id kombination). Returnerer dance_id. conn er valgfri — bruges ved nested kald.""" name = name.strip() close = False if conn is None: conn = new_conn() close = True try: existing = conn.execute( "SELECT id FROM dances WHERE name=? COLLATE NOCASE AND level_id IS ?", (name, level_id) ).fetchone() if existing: conn.execute( "UPDATE dances SET use_count=use_count+1 WHERE id=?", (existing["id"],) ) return existing["id"] conn.execute( "INSERT INTO dances (name, level_id, use_count, source) VALUES (?,?,1,'local')", (name, level_id) ) return conn.execute( "SELECT id FROM dances WHERE name=? COLLATE NOCASE AND level_id IS ?", (name, level_id) ).fetchone()["id"] finally: if close: conn.commit() conn.close() def get_dance_suggestions(prefix: str, limit: int = 20) -> list[dict]: """Returnerer danse der starter med prefix som {id, name, level_id, level_name}. Sorteret efter popularitet — bruges til autoudfyld.""" with get_db() as conn: rows = conn.execute(""" SELECT d.id, d.name, d.level_id, d.use_count, dl.name as level_name, dl.sort_order FROM dances d LEFT JOIN dance_levels dl ON dl.id = d.level_id WHERE d.name LIKE ? COLLATE NOCASE ORDER BY d.use_count DESC, dl.sort_order, d.name LIMIT ? """, (f"{prefix}%", limit)).fetchall() return [dict(r) for r in rows] def get_dances_for_song(song_id: str) -> list[dict]: """Hent hoveddanse for en sang med niveau-info og workshop-flag.""" with get_db() as conn: rows = conn.execute(""" SELECT d.id as dance_id, d.name, d.level_id, dl.name as level_name, sd.dance_order, sd.id as song_dance_id, sd.is_workshop FROM song_dances sd JOIN dances d ON d.id = sd.dance_id LEFT JOIN dance_levels dl ON dl.id = d.level_id WHERE sd.song_id=? ORDER BY sd.dance_order """, (song_id,)).fetchall() return [dict(r) for r in rows] def get_alt_dances_for_song(song_id: str) -> list[dict]: """Hent alternativ-danse for en sang med niveau-info.""" with get_db() as conn: rows = conn.execute(""" SELECT d.id as dance_id, d.name, d.level_id, dl.name as level_name, sad.note, sad.source, sad.id as alt_id FROM song_alt_dances sad JOIN dances d ON d.id = sad.dance_id LEFT JOIN dance_levels dl ON dl.id = d.level_id WHERE sad.song_id=? ORDER BY d.name """, (song_id,)).fetchall() return [dict(r) for r in rows] # ── Dans-niveauer ───────────────────────────────────────────────────────────── def get_dance_levels() -> list[sqlite3.Row]: """Hent alle niveauer sorteret efter sort_order.""" with get_db() as conn: return conn.execute( "SELECT * FROM dance_levels ORDER BY sort_order" ).fetchall() def sync_dance_levels_from_api(levels: list[dict]): """Synkroniser niveauer fra API.""" from datetime import datetime, timezone now = datetime.now(timezone.utc).isoformat() with get_db() as conn: for lvl in levels: conn.execute(""" INSERT INTO dance_levels (sort_order, name, description, synced_at) VALUES (?, ?, ?, ?) ON CONFLICT(name) DO UPDATE SET sort_order = excluded.sort_order, description = excluded.description, synced_at = excluded.synced_at """, (lvl["sort_order"], lvl["name"], lvl.get("description", ""), now)) def sync_dances_from_api(dances: list[dict]): """Synkroniser danse fra API — {name, level_id, use_count}.""" from datetime import datetime, timezone now = datetime.now(timezone.utc).isoformat() with get_db() as conn: for d in dances: conn.execute(""" INSERT INTO dances (name, level_id, use_count, source, synced_at) VALUES (?, ?, ?, 'community', ?) ON CONFLICT(name, level_id) DO UPDATE SET use_count = MAX(use_count, excluded.use_count), synced_at = excluded.synced_at """, (d["name"], d.get("level_id"), d.get("use_count", 1), now)) # Backwards compat alias def get_dance_name_suggestions(prefix: str, limit: int = 20) -> list[str]: """Returnerer dans-navne som strings — bruges af AutoLineEdit.""" suggestions = get_dance_suggestions(prefix, limit) result = [] for s in suggestions: if s.get("level_name"): result.append(f"{s['name']} / {s['level_name']}") else: result.append(s["name"]) return result