""" local_db.py — Lokal SQLite database til offline brug. Håndterer: - Musikbiblioteker (stier der overvåges) - Sange høstet fra filsystemet - Lokale afspilningslister (offline-projekter) - Synkroniseringsstatus mod API """ import sqlite3 import threading from contextlib import contextmanager from datetime import datetime, timezone from pathlib import Path DB_PATH = Path.home() / ".linedance" / "local.db" _local = threading.local() _global_conn: sqlite3.Connection | None = None def _get_conn() -> sqlite3.Connection: """Returnerer en global forbindelse i autocommit mode.""" global _global_conn if _global_conn is None: DB_PATH.parent.mkdir(parents=True, exist_ok=True) _global_conn = sqlite3.connect(str(DB_PATH), check_same_thread=False, isolation_level=None) # autocommit _global_conn.row_factory = sqlite3.Row _global_conn.execute("PRAGMA journal_mode=WAL") _global_conn.execute("PRAGMA foreign_keys=ON") return _global_conn def new_conn() -> sqlite3.Connection: """Åbn en frisk forbindelse til brug i tag_editor og dialogs.""" conn = sqlite3.connect(str(DB_PATH), check_same_thread=False) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys=OFF") # FK checker forhindrer level_id gem return conn @contextmanager def get_db(): """Context manager der bruger app-forbindelsen i autocommit mode. Hver statement committer med det samme — ingen eksplicit transaktion.""" conn = _get_conn() try: yield conn except Exception: raise def get_db_raw() -> sqlite3.Connection: return _get_conn() def init_db(): """Opret alle tabeller hvis de ikke findes.""" conn = _get_conn() # executescript committer automatisk og nulstiller isolation_level # Kør det direkte på den underliggende connection conn.executescript(""" CREATE TABLE IF NOT EXISTS libraries ( id INTEGER PRIMARY KEY AUTOINCREMENT, path TEXT NOT NULL UNIQUE, is_active INTEGER NOT NULL DEFAULT 1, last_full_scan TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS songs ( id TEXT PRIMARY KEY, library_id INTEGER REFERENCES libraries(id), local_path TEXT NOT NULL UNIQUE, title TEXT NOT NULL DEFAULT '', artist TEXT NOT NULL DEFAULT '', album TEXT NOT NULL DEFAULT '', bpm INTEGER NOT NULL DEFAULT 0, duration_sec INTEGER NOT NULL DEFAULT 0, file_format TEXT NOT NULL DEFAULT '', file_modified_at TEXT NOT NULL, file_missing INTEGER NOT NULL DEFAULT 0, extra_tags TEXT NOT NULL DEFAULT '{}', api_song_id TEXT, last_synced_at TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS dance_levels ( id INTEGER PRIMARY KEY AUTOINCREMENT, sort_order INTEGER NOT NULL, name TEXT NOT NULL UNIQUE, description TEXT NOT NULL DEFAULT '', synced_at TEXT ); CREATE TABLE IF NOT EXISTS song_dances ( id INTEGER PRIMARY KEY AUTOINCREMENT, song_id TEXT NOT NULL REFERENCES songs(id) ON DELETE CASCADE, dance_name TEXT NOT NULL, dance_order INTEGER NOT NULL DEFAULT 1, level_id INTEGER REFERENCES dance_levels(id) ); CREATE TABLE IF NOT EXISTS dance_alternatives ( id TEXT PRIMARY KEY, song_dance_id INTEGER NOT NULL REFERENCES song_dances(id) ON DELETE CASCADE, alt_dance_name TEXT NOT NULL DEFAULT '', level_id INTEGER REFERENCES dance_levels(id), note TEXT NOT NULL DEFAULT '', source TEXT NOT NULL DEFAULT 'local', created_by TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS dance_names ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE COLLATE NOCASE, source TEXT NOT NULL DEFAULT 'local', use_count INTEGER NOT NULL DEFAULT 1, synced_at TEXT ); CREATE TABLE IF NOT EXISTS playlists ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, description TEXT NOT NULL DEFAULT '', api_project_id TEXT, last_synced_at TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS playlist_songs ( id INTEGER PRIMARY KEY AUTOINCREMENT, playlist_id INTEGER NOT NULL REFERENCES playlists(id) ON DELETE CASCADE, song_id TEXT NOT NULL REFERENCES songs(id), position INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'pending', UNIQUE(playlist_id, position) ); CREATE TABLE IF NOT EXISTS sync_queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, entity_type TEXT NOT NULL, entity_id TEXT NOT NULL, action TEXT NOT NULL, payload TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS event_state ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_songs_title ON songs(title); CREATE INDEX IF NOT EXISTS idx_songs_artist ON songs(artist); CREATE INDEX IF NOT EXISTS idx_songs_missing ON songs(file_missing); CREATE INDEX IF NOT EXISTS idx_songs_library ON songs(library_id); CREATE INDEX IF NOT EXISTS idx_song_dances ON song_dances(song_id); """) # executescript slår foreign_keys fra — genaktiver conn.execute("PRAGMA foreign_keys=ON") # Tilføj db_version tabel hvis den ikke findes conn.execute(""" CREATE TABLE IF NOT EXISTS db_version ( version INTEGER PRIMARY KEY ) """) # Kør versionsbaserede migrationer _run_versioned_migrations(conn) # Seed standard-niveauer count = conn.execute("SELECT COUNT(*) FROM dance_levels").fetchone()[0] if count == 0: defaults = [ (1, "Begynder", "Passer til alle"), (2, "Let øvet", "Lidt erfaring kræves"), (3, "Øvet", "Kræver regelmæssig træning"), (4, "Erfaren", "For dedikerede dansere"), (5, "Ekspert", "Konkurrenceniveau"), ] for row in defaults: conn.execute( "INSERT OR IGNORE INTO dance_levels (sort_order, name, description) VALUES (?,?,?)", row ) # ── Versionsbaserede migrationer ────────────────────────────────────────────── # Tilføj aldrig gamle — tilføj kun nye versioner nederst. MIGRATIONS: dict[int, list[str]] = { 1: [ "ALTER TABLE songs ADD COLUMN extra_tags TEXT NOT NULL DEFAULT '{}'", "ALTER TABLE song_dances ADD COLUMN level_id INTEGER REFERENCES dance_levels(id)", "ALTER TABLE dance_alternatives ADD COLUMN alt_dance_name TEXT NOT NULL DEFAULT ''", "ALTER TABLE dance_alternatives ADD COLUMN level_id INTEGER REFERENCES dance_levels(id)", "ALTER TABLE dance_alternatives ADD COLUMN source TEXT NOT NULL DEFAULT 'local'", "ALTER TABLE dance_alternatives ADD COLUMN created_by TEXT NOT NULL DEFAULT ''", ], # Eksempel på fremtidig migration: # 2: ["ALTER TABLE songs ADD COLUMN mbid TEXT"], } def _run_versioned_migrations(conn): """Kør kun migrationer der ikke allerede er kørt vha. db_version tabel.""" row = conn.execute("SELECT version FROM db_version").fetchone() current_version = row["version"] if row else 0 for version in sorted(MIGRATIONS.keys()): if version <= current_version: continue for sql in MIGRATIONS[version]: try: conn.execute(sql) except Exception: pass # kolonnen eksisterer allerede conn.execute( "INSERT OR REPLACE INTO db_version (version) VALUES (?)", (version,) ) # ── Biblioteker ─────────────────────────────────────────────────────────────── def add_library(path: str) -> int: with get_db() as conn: cur = conn.execute( "INSERT OR IGNORE INTO libraries (path) VALUES (?)", (path,) ) if cur.lastrowid: return cur.lastrowid row = conn.execute("SELECT id FROM libraries WHERE path=?", (path,)).fetchone() return row["id"] def get_libraries(active_only: bool = True) -> list[sqlite3.Row]: with get_db() as conn: if active_only: return conn.execute( "SELECT * FROM libraries WHERE is_active=1 ORDER BY path" ).fetchall() return conn.execute("SELECT * FROM libraries ORDER BY path").fetchall() def remove_library(library_id: int): with get_db() as conn: # Marker sange som manglende conn.execute( "UPDATE songs SET file_missing=1 WHERE library_id=?", (library_id,) ) # Slet biblioteket helt conn.execute("DELETE FROM libraries WHERE id=?", (library_id,)) def update_library_scan_time(library_id: int): now = datetime.now(timezone.utc).isoformat() with get_db() as conn: conn.execute( "UPDATE libraries SET last_full_scan=? WHERE id=?", (now, library_id) ) # ── Sange ───────────────────────────────────────────────────────────────────── def upsert_song(song_data: dict) -> str: """ Indsæt eller opdater en sang baseret på local_path. Returnerer song_id. """ import uuid, json with get_db() as conn: existing = conn.execute( "SELECT id FROM songs WHERE local_path=?", (song_data["local_path"],) ).fetchone() extra_tags_json = json.dumps(song_data.get("extra_tags", {}), ensure_ascii=False) if existing: song_id = existing["id"] conn.execute(""" UPDATE songs SET library_id=?, title=?, artist=?, album=?, bpm=?, duration_sec=?, file_format=?, file_modified_at=?, file_missing=0, extra_tags=? WHERE id=? """, ( song_data.get("library_id"), song_data.get("title", ""), song_data.get("artist", ""), song_data.get("album", ""), song_data.get("bpm", 0), song_data.get("duration_sec", 0), song_data.get("file_format", ""), song_data.get("file_modified_at", ""), extra_tags_json, song_id, )) else: song_id = str(uuid.uuid4()) conn.execute(""" INSERT INTO songs (id, library_id, local_path, title, artist, album, bpm, duration_sec, file_format, file_modified_at, extra_tags) VALUES (?,?,?,?,?,?,?,?,?,?,?) """, ( song_id, song_data.get("library_id"), song_data["local_path"], song_data.get("title", ""), song_data.get("artist", ""), song_data.get("album", ""), song_data.get("bpm", 0), song_data.get("duration_sec", 0), song_data.get("file_format", ""), song_data.get("file_modified_at", ""), extra_tags_json, )) # Opdater danse hvis de er med i data — bevar level_id og alternativer if "dances" in song_data: file_dances = [] for dance in song_data["dances"]: if isinstance(dance, dict): file_dances.append(dance.get("name", "")) else: file_dances.append(dance) file_dances = [d for d in file_dances if d] # Hent eksisterende danse med level_id og alternativer existing = conn.execute( "SELECT id, dance_name, dance_order, level_id FROM song_dances " "WHERE song_id=? ORDER BY dance_order", (song_id,) ).fetchall() existing_map = {r["dance_name"].lower(): r for r in existing} # Slet danse der ikke længere er i filen file_lower = [d.lower() for d in file_dances] for row in existing: if row["dance_name"].lower() not in file_lower: conn.execute( "DELETE FROM dance_alternatives WHERE song_dance_id=?", (row["id"],) ) conn.execute("DELETE FROM song_dances WHERE id=?", (row["id"],)) # Tilføj eller opdater danse fra filen for i, name in enumerate(file_dances, start=1): ex = existing_map.get(name.lower()) if ex: # Bevar level_id — opdater kun dance_order conn.execute( "UPDATE song_dances SET dance_order=? WHERE id=?", (i, ex["id"]) ) else: # Ny dans — ingen level_id endnu conn.execute( "INSERT INTO song_dances (song_id, dance_name, dance_order, level_id) " "VALUES (?,?,?,NULL)", (song_id, name, i) ) return song_id def mark_song_missing(local_path: str): with get_db() as conn: conn.execute( "UPDATE songs SET file_missing=1 WHERE local_path=?", (local_path,) ) def get_song_by_path(local_path: str) -> sqlite3.Row | None: with get_db() as conn: return conn.execute( "SELECT * FROM songs WHERE local_path=?", (local_path,) ).fetchone() def search_songs(query: str, limit: int = 50) -> list[sqlite3.Row]: """Søg i alle tags — titel, artist, album, danse og alle øvrige tags.""" pattern = f"%{query}%" with get_db() as conn: return conn.execute(""" SELECT DISTINCT s.* FROM songs s LEFT JOIN song_dances sd ON sd.song_id = s.id WHERE s.file_missing = 0 AND ( s.title LIKE ? OR s.artist LIKE ? OR s.album LIKE ? OR sd.dance_name LIKE ? OR s.extra_tags LIKE ? ) ORDER BY s.artist, s.title LIMIT ? """, (pattern, pattern, pattern, pattern, pattern, limit)).fetchall() def get_songs_for_library(library_id: int) -> list[sqlite3.Row]: with get_db() as conn: return conn.execute( "SELECT * FROM songs WHERE library_id=? ORDER BY artist, title", (library_id,) ).fetchall() def get_all_song_paths_for_library(library_id: int) -> dict[str, str]: """Returnerer {local_path: file_modified_at} — bruges til fuld scan.""" with get_db() as conn: rows = conn.execute( "SELECT local_path, file_modified_at FROM songs WHERE library_id=?", (library_id,) ).fetchall() return {row["local_path"]: row["file_modified_at"] for row in rows} # ── Afspilningslister ───────────────────────────────────────────────────────── def create_playlist(name: str, description: str = "") -> int: with get_db() as conn: cur = conn.execute( "INSERT INTO playlists (name, description) VALUES (?,?)", (name, description) ) return cur.lastrowid def get_playlists() -> list[sqlite3.Row]: with get_db() as conn: return conn.execute( "SELECT * FROM playlists ORDER BY created_at DESC" ).fetchall() def add_song_to_playlist(playlist_id: int, song_id: str, position: int | None = None) -> int: with get_db() as conn: if position is None: row = conn.execute( "SELECT MAX(position) as max_pos FROM playlist_songs WHERE playlist_id=?", (playlist_id,) ).fetchone() position = (row["max_pos"] or 0) + 1 cur = conn.execute( "INSERT INTO playlist_songs (playlist_id, song_id, position) VALUES (?,?,?)", (playlist_id, song_id, position) ) return cur.lastrowid def update_playlist_song_status(playlist_song_id: int, status: str): valid = {"pending", "playing", "played", "skipped"} if status not in valid: raise ValueError(f"Ugyldig status: {status}") with get_db() as conn: conn.execute( "UPDATE playlist_songs SET status=? WHERE id=?", (status, playlist_song_id) ) def get_playlist_with_songs(playlist_id: int) -> dict: with get_db() as conn: playlist = conn.execute( "SELECT * FROM playlists WHERE id=?", (playlist_id,) ).fetchone() if not playlist: return {} songs = conn.execute(""" SELECT ps.id as ps_id, ps.position, ps.status, s.*, GROUP_CONCAT(sd.dance_name ORDER BY sd.dance_order) as dances FROM playlist_songs ps JOIN songs s ON s.id = ps.song_id LEFT JOIN song_dances sd ON sd.song_id = s.id WHERE ps.playlist_id = ? GROUP BY ps.id ORDER BY ps.position """, (playlist_id,)).fetchall() return {"playlist": dict(playlist), "songs": [dict(s) for s in songs]} # ── Event-state (gemmes løbende så man kan genstarte efter strømsvigt) ──────── def save_event_state(current_idx: int, statuses: list[str]): """Gem event-fremgang — overskrives ved hver ændring.""" import json with get_db() as conn: conn.execute("INSERT OR REPLACE INTO event_state (key,value) VALUES ('current_idx',?)", (str(current_idx),)) conn.execute("INSERT OR REPLACE INTO event_state (key,value) VALUES ('statuses',?)", (json.dumps(statuses),)) def load_event_state() -> tuple[int, list[str]] | None: """Indlæs gemt event-fremgang. Returnerer None hvis ingen gemt tilstand.""" import json with get_db() as conn: idx_row = conn.execute( "SELECT value FROM event_state WHERE key='current_idx'" ).fetchone() sta_row = conn.execute( "SELECT value FROM event_state WHERE key='statuses'" ).fetchone() if not idx_row or not sta_row: return None return int(idx_row["value"]), json.loads(sta_row["value"]) def clear_event_state(): """Nulstil gemt event-tilstand (bruges ved 'Start event').""" with get_db() as conn: conn.execute("DELETE FROM event_state") # ── Dans-navne ordbog ───────────────────────────────────────────────────────── def get_dance_name_suggestions(prefix: str, limit: int = 20) -> list[str]: """Returnerer dans-navne der starter med prefix fra alle kendte sources, sorteret efter popularitet. Inkluderer navne fra song_dances og dance_alternatives.""" with get_db() as conn: # Hent fra dance_names ordbog (primær kilde) rows = conn.execute(""" SELECT name, use_count FROM dance_names WHERE name LIKE ? COLLATE NOCASE ORDER BY use_count DESC, name LIMIT ? """, (f"{prefix}%", limit)).fetchall() names = {r["name"]: r["use_count"] for r in rows} # Supplér med navne direkte fra song_dances der ikke er i ordbogen extra = conn.execute(""" SELECT DISTINCT dance_name as name FROM song_dances WHERE dance_name LIKE ? COLLATE NOCASE LIMIT ? """, (f"{prefix}%", limit)).fetchall() for r in extra: if r["name"] not in names: names[r["name"]] = 0 # Supplér med alternativ-danse extra2 = conn.execute(""" SELECT DISTINCT alt_dance_name as name FROM dance_alternatives WHERE alt_dance_name LIKE ? COLLATE NOCASE LIMIT ? """, (f"{prefix}%", limit)).fetchall() for r in extra2: if r["name"] not in names: names[r["name"]] = 0 # Sorter: kendte navne med høj use_count først, derefter alfabetisk return sorted(names.keys(), key=lambda n: (-names[n], n.lower()))[:limit] def register_dance_name(name: str, source: str = "local"): """Tilføj eller opdater et dans-navn i ordbogen.""" name = name.strip() if not name: return with get_db() as conn: existing = conn.execute( "SELECT id, use_count FROM dance_names WHERE name=? COLLATE NOCASE", (name,) ).fetchone() if existing: conn.execute( "UPDATE dance_names SET use_count=use_count+1 WHERE id=?", (existing["id"],) ) else: conn.execute( "INSERT INTO dance_names (name, source, use_count) VALUES (?,?,1)", (name, source) ) def sync_dance_names_from_api(names: list[dict]): """Synkroniser dans-navne fra API — {name, use_count}.""" from datetime import datetime, timezone now = datetime.now(timezone.utc).isoformat() with get_db() as conn: for item in names: conn.execute(""" INSERT INTO dance_names (name, source, use_count, synced_at) VALUES (?, 'community', ?, ?) ON CONFLICT(name) DO UPDATE SET use_count = MAX(use_count, excluded.use_count), synced_at = excluded.synced_at """, (item["name"], item.get("use_count", 1), now)) # ── Dans-niveauer ───────────────────────────────────────────────────────────── def get_dance_levels() -> list[sqlite3.Row]: """Hent alle niveauer sorteret efter sort_order.""" with get_db() as conn: return conn.execute( "SELECT * FROM dance_levels ORDER BY sort_order" ).fetchall() def sync_dance_levels_from_api(levels: list[dict]): """Synkroniser niveauer fra API — {sort_order, name, description}.""" from datetime import datetime, timezone now = datetime.now(timezone.utc).isoformat() with get_db() as conn: for lvl in levels: conn.execute(""" INSERT INTO dance_levels (sort_order, name, description, synced_at) VALUES (?, ?, ?, ?) ON CONFLICT(name) DO UPDATE SET sort_order = excluded.sort_order, description = excluded.description, synced_at = excluded.synced_at """, (lvl["sort_order"], lvl["name"], lvl.get("description", ""), now)) # ── Dans-alternativer ───────────────────────────────────────────────────────── def get_alternatives_for_dance(song_dance_id: int) -> list[sqlite3.Row]: with get_db() as conn: return conn.execute(""" SELECT da.*, dl.name as level_name, dl.sort_order as level_sort FROM dance_alternatives da LEFT JOIN dance_levels dl ON dl.id = da.level_id WHERE da.song_dance_id = ? ORDER BY da.source, dl.sort_order """, (song_dance_id,)).fetchall() def add_alternative(song_dance_id: int, alt_dance_name: str, level_id: int | None = None, note: str = "", source: str = "local", created_by: str = "") -> str: import uuid as _uuid alt_id = str(_uuid.uuid4()) with get_db() as conn: conn.execute(""" INSERT INTO dance_alternatives (id, song_dance_id, alt_dance_name, level_id, note, source, created_by) VALUES (?,?,?,?,?,?,?) """, (alt_id, song_dance_id, alt_dance_name.strip(), level_id, note, source, created_by)) # Registrer alt-dans-navne i ordbogen register_dance_name(alt_dance_name, source=source) return alt_id def remove_alternative(alt_id: str): with get_db() as conn: conn.execute("DELETE FROM dance_alternatives WHERE id=?", (alt_id,))