Files
LinedanceAfspiller/linedance-app/local/local_db.py
2026-04-14 17:00:29 +02:00

807 lines
30 KiB
Python

"""
local_db.py — Lokal SQLite database til offline brug.
Håndterer:
- Musikbiblioteker (stier der overvåges)
- Sange høstet fra filsystemet
- Lokale afspilningslister (offline-projekter)
- Synkroniseringsstatus mod API
"""
import sqlite3
import threading
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
DB_PATH = Path.home() / ".linedance" / "local.db"
_local = threading.local()
_global_conn: sqlite3.Connection | None = None
def _get_conn() -> sqlite3.Connection:
"""Returnerer en global forbindelse i autocommit mode."""
global _global_conn
if _global_conn is None:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
_global_conn = sqlite3.connect(str(DB_PATH), check_same_thread=False,
isolation_level=None) # autocommit
_global_conn.row_factory = sqlite3.Row
_global_conn.execute("PRAGMA journal_mode=WAL")
_global_conn.execute("PRAGMA foreign_keys=ON")
return _global_conn
def new_conn() -> sqlite3.Connection:
"""Åbn en frisk forbindelse til brug i tag_editor og dialogs."""
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys=OFF") # FK checker forhindrer level_id gem
return conn
@contextmanager
def get_db():
"""Context manager der bruger app-forbindelsen i autocommit mode.
Hver statement committer med det samme — ingen eksplicit transaktion."""
conn = _get_conn()
try:
yield conn
except Exception:
raise
def get_db_raw() -> sqlite3.Connection:
return _get_conn()
def init_db():
"""Opret alle tabeller hvis de ikke findes."""
conn = _get_conn()
# executescript committer automatisk og nulstiller isolation_level
# Kør det direkte på den underliggende connection
conn.executescript("""
CREATE TABLE IF NOT EXISTS libraries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
path TEXT NOT NULL UNIQUE,
is_active INTEGER NOT NULL DEFAULT 1,
last_full_scan TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS songs (
id TEXT PRIMARY KEY,
library_id INTEGER REFERENCES libraries(id),
local_path TEXT NOT NULL UNIQUE,
title TEXT NOT NULL DEFAULT '',
artist TEXT NOT NULL DEFAULT '',
album TEXT NOT NULL DEFAULT '',
bpm INTEGER NOT NULL DEFAULT 0,
duration_sec INTEGER NOT NULL DEFAULT 0,
file_format TEXT NOT NULL DEFAULT '',
file_modified_at TEXT NOT NULL,
file_missing INTEGER NOT NULL DEFAULT 0,
extra_tags TEXT NOT NULL DEFAULT '{}',
api_song_id TEXT,
last_synced_at TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS dance_levels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sort_order INTEGER NOT NULL,
name TEXT NOT NULL UNIQUE,
description TEXT NOT NULL DEFAULT '',
synced_at TEXT
);
-- Dans-entitet: navn + niveau er unik kombination
CREATE TABLE IF NOT EXISTS dances (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL COLLATE NOCASE,
level_id INTEGER REFERENCES dance_levels(id),
use_count INTEGER NOT NULL DEFAULT 1,
source TEXT NOT NULL DEFAULT 'local',
synced_at TEXT,
UNIQUE(name, level_id)
);
-- Hoveddanse på en sang
CREATE TABLE IF NOT EXISTS song_dances (
id INTEGER PRIMARY KEY AUTOINCREMENT,
song_id TEXT NOT NULL REFERENCES songs(id) ON DELETE CASCADE,
dance_id INTEGER NOT NULL REFERENCES dances(id),
dance_order INTEGER NOT NULL DEFAULT 1,
UNIQUE(song_id, dance_id)
);
-- Alternativ-danse på en sang
CREATE TABLE IF NOT EXISTS song_alt_dances (
id INTEGER PRIMARY KEY AUTOINCREMENT,
song_id TEXT NOT NULL REFERENCES songs(id) ON DELETE CASCADE,
dance_id INTEGER NOT NULL REFERENCES dances(id),
note TEXT NOT NULL DEFAULT '',
source TEXT NOT NULL DEFAULT 'local',
created_at TEXT NOT NULL DEFAULT (datetime('now')),
UNIQUE(song_id, dance_id)
);
CREATE TABLE IF NOT EXISTS playlists (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT NOT NULL DEFAULT '',
api_project_id TEXT,
last_synced_at TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS playlist_songs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
playlist_id INTEGER NOT NULL REFERENCES playlists(id) ON DELETE CASCADE,
song_id TEXT NOT NULL REFERENCES songs(id),
position INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
UNIQUE(playlist_id, position)
);
CREATE TABLE IF NOT EXISTS sync_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
entity_type TEXT NOT NULL,
entity_id TEXT NOT NULL,
action TEXT NOT NULL,
payload TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS event_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_songs_title ON songs(title);
CREATE INDEX IF NOT EXISTS idx_songs_artist ON songs(artist);
CREATE INDEX IF NOT EXISTS idx_songs_missing ON songs(file_missing);
CREATE INDEX IF NOT EXISTS idx_songs_library ON songs(library_id);
CREATE INDEX IF NOT EXISTS idx_song_dances ON song_dances(song_id);
CREATE INDEX IF NOT EXISTS idx_song_alt_dances ON song_alt_dances(song_id);
CREATE INDEX IF NOT EXISTS idx_dances_name ON dances(name);
""")
# executescript slår foreign_keys fra — genaktiver
conn.execute("PRAGMA foreign_keys=ON")
# Tilføj db_version tabel hvis den ikke findes
conn.execute("""
CREATE TABLE IF NOT EXISTS db_version (
version INTEGER PRIMARY KEY
)
""")
# Kør versionsbaserede migrationer
_run_versioned_migrations(conn)
# Seed standard-niveauer
count = conn.execute("SELECT COUNT(*) FROM dance_levels").fetchone()[0]
if count == 0:
defaults = [
(1, "Begynder", "Passer til alle"),
(2, "Let øvet", "Lidt erfaring kræves"),
(3, "Øvet", "Kræver regelmæssig træning"),
(4, "Erfaren", "For dedikerede dansere"),
(5, "Ekspert", "Konkurrenceniveau"),
]
for row in defaults:
conn.execute(
"INSERT OR IGNORE INTO dance_levels (sort_order, name, description) VALUES (?,?,?)",
row
)
# ── Versionsbaserede migrationer ──────────────────────────────────────────────
# Tilføj aldrig gamle — tilføj kun nye versioner nederst.
MIGRATIONS: dict[int, list[str]] = {
1: [
"ALTER TABLE songs ADD COLUMN extra_tags TEXT NOT NULL DEFAULT '{}'",
],
2: [
# Ny dans-entitet model
"""CREATE TABLE IF NOT EXISTS dances (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL COLLATE NOCASE,
level_id INTEGER REFERENCES dance_levels(id),
use_count INTEGER NOT NULL DEFAULT 1,
source TEXT NOT NULL DEFAULT 'local',
synced_at TEXT,
UNIQUE(name, level_id)
)""",
"""CREATE TABLE IF NOT EXISTS song_alt_dances (
id INTEGER PRIMARY KEY AUTOINCREMENT,
song_id TEXT NOT NULL REFERENCES songs(id) ON DELETE CASCADE,
dance_id INTEGER NOT NULL REFERENCES dances(id),
note TEXT NOT NULL DEFAULT '',
source TEXT NOT NULL DEFAULT 'local',
created_at TEXT NOT NULL DEFAULT (datetime('now')),
UNIQUE(song_id, dance_id)
)""",
# Migrer eksisterende song_dances data til ny model
# (kører kun på ældre databaser der har dance_name kolonnen)
"""INSERT OR IGNORE INTO dances (name, level_id, source)
SELECT DISTINCT dance_name, level_id, 'local'
FROM song_dances WHERE dance_name IS NOT NULL AND dance_name != ''""",
],
3: [
"ALTER TABLE playlists ADD COLUMN tags TEXT NOT NULL DEFAULT ''",
],
4: [
"ALTER TABLE dances ADD COLUMN choreographer TEXT NOT NULL DEFAULT ''",
"ALTER TABLE dances ADD COLUMN video_url TEXT NOT NULL DEFAULT ''",
"ALTER TABLE dances ADD COLUMN stepsheet_url TEXT NOT NULL DEFAULT ''",
"ALTER TABLE dances ADD COLUMN notes TEXT NOT NULL DEFAULT ''",
],
5: [
# Workshop-markering på sang+dans kombination (ikke dans alene)
"""ALTER TABLE song_dances ADD COLUMN is_workshop INTEGER NOT NULL DEFAULT 0""",
"""ALTER TABLE song_alt_dances ADD COLUMN is_workshop INTEGER NOT NULL DEFAULT 0""",
],
6: [
# Workshop og dans-valg på selve playlist-sangen
"""ALTER TABLE playlist_songs ADD COLUMN is_workshop INTEGER NOT NULL DEFAULT 0""",
"""ALTER TABLE playlist_songs ADD COLUMN dance_override TEXT NOT NULL DEFAULT ''""",
],
7: [
# Linkede server-playlister
"""ALTER TABLE playlists ADD COLUMN is_linked INTEGER NOT NULL DEFAULT 0""",
"""ALTER TABLE playlists ADD COLUMN server_permission TEXT NOT NULL DEFAULT 'view'""",
],
8: [
# MusicBrainz og AcoustID matching
"""ALTER TABLE songs ADD COLUMN mbid TEXT""",
"""ALTER TABLE songs ADD COLUMN acoustid TEXT""",
],
}
def _run_versioned_migrations(conn):
"""Kør kun migrationer der ikke allerede er kørt vha. db_version tabel."""
row = conn.execute("SELECT version FROM db_version").fetchone()
current_version = row["version"] if row else 0
for version in sorted(MIGRATIONS.keys()):
if version <= current_version:
continue
for sql in MIGRATIONS[version]:
try:
conn.execute(sql)
except Exception:
pass # kolonnen eksisterer allerede
conn.execute(
"INSERT OR REPLACE INTO db_version (version) VALUES (?)", (version,)
)
# ── Biblioteker ───────────────────────────────────────────────────────────────
def add_library(path: str) -> int:
with get_db() as conn:
cur = conn.execute(
"INSERT OR IGNORE INTO libraries (path) VALUES (?)", (path,)
)
if cur.lastrowid:
return cur.lastrowid
row = conn.execute("SELECT id FROM libraries WHERE path=?", (path,)).fetchone()
return row["id"]
def get_libraries(active_only: bool = True) -> list[sqlite3.Row]:
with get_db() as conn:
if active_only:
return conn.execute(
"SELECT * FROM libraries WHERE is_active=1 ORDER BY path"
).fetchall()
return conn.execute("SELECT * FROM libraries ORDER BY path").fetchall()
def remove_library(library_id: int):
with get_db() as conn:
# Marker sange som manglende og løsriv dem fra biblioteket
conn.execute(
"UPDATE songs SET file_missing=1, library_id=NULL WHERE library_id=?",
(library_id,)
)
# Nu kan biblioteket slettes uden FK-konflikt
conn.execute("DELETE FROM libraries WHERE id=?", (library_id,))
def update_library_scan_time(library_id: int):
now = datetime.now(timezone.utc).isoformat()
with get_db() as conn:
conn.execute(
"UPDATE libraries SET last_full_scan=? WHERE id=?", (now, library_id)
)
# ── Sange ─────────────────────────────────────────────────────────────────────
def upsert_song(song_data: dict) -> str:
"""
Indsæt eller opdater en sang baseret på local_path.
Returnerer song_id.
"""
import uuid, json
with get_db() as conn:
existing = conn.execute(
"SELECT id FROM songs WHERE local_path=?", (song_data["local_path"],)
).fetchone()
extra_tags_json = json.dumps(song_data.get("extra_tags", {}), ensure_ascii=False)
if existing:
song_id = existing["id"]
conn.execute("""
UPDATE songs SET
library_id=?, title=?, artist=?, album=?, bpm=?, duration_sec=?,
file_format=?, file_modified_at=?, file_missing=0, extra_tags=?
WHERE id=?
""", (
song_data.get("library_id"),
song_data.get("title", ""),
song_data.get("artist", ""),
song_data.get("album", ""),
song_data.get("bpm", 0),
song_data.get("duration_sec", 0),
song_data.get("file_format", ""),
song_data.get("file_modified_at", ""),
extra_tags_json,
song_id,
))
else:
song_id = str(uuid.uuid4())
conn.execute("""
INSERT INTO songs
(id, library_id, local_path, title, artist, album,
bpm, duration_sec, file_format, file_modified_at, extra_tags)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
""", (
song_id,
song_data.get("library_id"),
song_data["local_path"],
song_data.get("title", ""),
song_data.get("artist", ""),
song_data.get("album", ""),
song_data.get("bpm", 0),
song_data.get("duration_sec", 0),
song_data.get("file_format", ""),
song_data.get("file_modified_at", ""),
extra_tags_json,
))
# Opdater danse hvis de er med i data — bevar eksisterende og merge
if "dances" in song_data:
file_dances = []
for dance in song_data["dances"]:
name = dance.get("name", dance) if isinstance(dance, dict) else dance
if name:
file_dances.append(name.strip())
# Find eksisterende song_dances via dances tabel
existing = conn.execute("""
SELECT sd.id, d.name, sd.dance_order, d.level_id, d.id as dance_id
FROM song_dances sd
JOIN dances d ON d.id = sd.dance_id
WHERE sd.song_id=? ORDER BY sd.dance_order
""", (song_id,)).fetchall()
existing_map = {r["name"].lower(): r for r in existing}
file_lower = [d.lower() for d in file_dances]
# Slet danse der ikke længere er i filen
for row in existing:
if row["name"].lower() not in file_lower:
conn.execute("DELETE FROM song_dances WHERE id=?", (row["id"],))
# Tilføj eller opdater danse fra filen
for i, name in enumerate(file_dances, start=1):
ex = existing_map.get(name.lower())
if ex:
conn.execute(
"UPDATE song_dances SET dance_order=? WHERE id=?",
(i, ex["id"])
)
else:
# Opret eller find dans (name + NULL level = ny dans uden niveau)
dance_id = get_or_create_dance(name, None, conn)
conn.execute(
"INSERT OR IGNORE INTO song_dances (song_id, dance_id, dance_order) "
"VALUES (?,?,?)",
(song_id, dance_id, i)
)
return song_id
def mark_song_missing(local_path: str):
with get_db() as conn:
conn.execute(
"UPDATE songs SET file_missing=1 WHERE local_path=?", (local_path,)
)
def get_song_by_path(local_path: str) -> sqlite3.Row | None:
with get_db() as conn:
return conn.execute(
"SELECT * FROM songs WHERE local_path=?", (local_path,)
).fetchone()
def search_songs(query: str, limit: int = 50) -> list[sqlite3.Row]:
"""Søg i alle tags — titel, artist, album, danse og alle øvrige tags."""
pattern = f"%{query}%"
with get_db() as conn:
return conn.execute("""
SELECT DISTINCT s.* FROM songs s
LEFT JOIN song_dances sd ON sd.song_id = s.id
LEFT JOIN dances d ON d.id = sd.dance_id
WHERE s.file_missing = 0
AND (
s.title LIKE ? OR
s.artist LIKE ? OR
s.album LIKE ? OR
d.name LIKE ? OR
s.extra_tags LIKE ?
)
ORDER BY s.artist, s.title
LIMIT ?
""", (pattern, pattern, pattern, pattern, pattern, limit)).fetchall()
def get_songs_for_library(library_id: int) -> list[sqlite3.Row]:
with get_db() as conn:
return conn.execute(
"SELECT * FROM songs WHERE library_id=? ORDER BY artist, title",
(library_id,)
).fetchall()
def get_all_song_paths_for_library(library_id: int) -> dict[str, str]:
"""Returnerer {local_path: file_modified_at} — bruges til fuld scan."""
with get_db() as conn:
rows = conn.execute(
"SELECT local_path, file_modified_at FROM songs WHERE library_id=?",
(library_id,)
).fetchall()
return {row["local_path"]: row["file_modified_at"] for row in rows}
# ── Afspilningslister ─────────────────────────────────────────────────────────
def create_playlist(name: str, description: str = "", tags: str = "") -> int:
with get_db() as conn:
cur = conn.execute(
"INSERT INTO playlists (name, description, tags) VALUES (?,?,?)",
(name, description, tags)
)
return cur.lastrowid
def create_linked_playlist(name: str, api_project_id: str,
permission: str = "view",
description: str = "", tags: str = "") -> int:
"""Opret en playliste der er linket til en server-playliste."""
with get_db() as conn:
cur = conn.execute(
"""INSERT INTO playlists
(name, description, tags, api_project_id, is_linked, server_permission)
VALUES (?,?,?,?,1,?)""",
(name, description, tags, api_project_id, permission)
)
return cur.lastrowid
def update_playlist_tags(playlist_id: int, tags: str):
with get_db() as conn:
conn.execute(
"UPDATE playlists SET tags=? WHERE id=?",
(tags, playlist_id)
)
def get_all_playlist_tags() -> list[str]:
"""Returnerer alle unikke tags på tværs af alle playlists, sorteret alfabetisk."""
with get_db() as conn:
rows = conn.execute(
"SELECT tags FROM playlists WHERE tags != '' AND name != ?",
("__aktiv__",)
).fetchall()
tags = set()
for row in rows:
for tag in row["tags"].split(","):
t = tag.strip().lower()
if t:
tags.add(t)
return sorted(tags)
def get_playlists(tag_filter: str | None = None) -> list[sqlite3.Row]:
"""Hent alle navngivne playlists med sang-antal. Filtrer på tag hvis angivet."""
with get_db() as conn:
if tag_filter:
rows = conn.execute("""
SELECT p.*, COUNT(ps.id) as song_count
FROM playlists p
LEFT JOIN playlist_songs ps ON ps.playlist_id = p.id
WHERE p.name != ? AND (
p.tags LIKE ? OR p.tags LIKE ? OR
p.tags LIKE ? OR p.tags = ?
)
GROUP BY p.id
ORDER BY p.created_at DESC
""", (
"__aktiv__",
f"{tag_filter},%",
f"%, {tag_filter},%",
f"%, {tag_filter}",
tag_filter,
)).fetchall()
else:
rows = conn.execute("""
SELECT p.*, COUNT(ps.id) as song_count
FROM playlists p
LEFT JOIN playlist_songs ps ON ps.playlist_id = p.id
WHERE p.name != ?
GROUP BY p.id
ORDER BY p.created_at DESC
""", ("__aktiv__",)).fetchall()
return rows
def add_song_to_playlist(playlist_id: int, song_id: str, position: int | None = None) -> int:
with get_db() as conn:
if position is None:
row = conn.execute(
"SELECT MAX(position) as max_pos FROM playlist_songs WHERE playlist_id=?",
(playlist_id,)
).fetchone()
position = (row["max_pos"] or 0) + 1
cur = conn.execute(
"INSERT INTO playlist_songs (playlist_id, song_id, position) VALUES (?,?,?)",
(playlist_id, song_id, position)
)
return cur.lastrowid
def update_playlist_song_status(playlist_song_id: int, status: str):
valid = {"pending", "playing", "played", "skipped"}
if status not in valid:
raise ValueError(f"Ugyldig status: {status}")
with get_db() as conn:
conn.execute(
"UPDATE playlist_songs SET status=? WHERE id=?",
(status, playlist_song_id)
)
def get_playlist_with_songs(playlist_id: int) -> dict:
with get_db() as conn:
playlist = conn.execute(
"SELECT * FROM playlists WHERE id=?", (playlist_id,)
).fetchone()
if not playlist:
return {}
songs = conn.execute("""
SELECT ps.id as ps_id, ps.position, ps.status,
s.*, GROUP_CONCAT(sd.dance_name ORDER BY sd.dance_order) as dances
FROM playlist_songs ps
JOIN songs s ON s.id = ps.song_id
LEFT JOIN song_dances sd ON sd.song_id = s.id
WHERE ps.playlist_id = ?
GROUP BY ps.id
ORDER BY ps.position
""", (playlist_id,)).fetchall()
return {"playlist": dict(playlist), "songs": [dict(s) for s in songs]}
# ── Event-state (gemmes løbende så man kan genstarte efter strømsvigt) ────────
def save_event_state(current_idx: int, statuses: list[str]):
"""Gem event-fremgang — overskrives ved hver ændring."""
import json
with get_db() as conn:
conn.execute("INSERT OR REPLACE INTO event_state (key,value) VALUES ('current_idx',?)",
(str(current_idx),))
conn.execute("INSERT OR REPLACE INTO event_state (key,value) VALUES ('statuses',?)",
(json.dumps(statuses),))
def load_event_state() -> tuple[int, list[str]] | None:
"""Indlæs gemt event-fremgang. Returnerer None hvis ingen gemt tilstand."""
import json
with get_db() as conn:
idx_row = conn.execute(
"SELECT value FROM event_state WHERE key='current_idx'"
).fetchone()
sta_row = conn.execute(
"SELECT value FROM event_state WHERE key='statuses'"
).fetchone()
if not idx_row or not sta_row:
return None
return int(idx_row["value"]), json.loads(sta_row["value"])
def clear_event_state():
"""Nulstil gemt event-tilstand (bruges ved 'Start event')."""
with get_db() as conn:
conn.execute("DELETE FROM event_state")
# ── Dans-navne ordbog ─────────────────────────────────────────────────────────
# ── Dans-entitet funktioner ───────────────────────────────────────────────────
def get_dance(dance_id: int) -> sqlite3.Row | None:
with get_db() as conn:
return conn.execute(
"SELECT * FROM dances WHERE id=?", (dance_id,)
).fetchone()
def update_dance_info(dance_id: int, choreographer: str = "",
video_url: str = "", stepsheet_url: str = "",
notes: str = ""):
with get_db() as conn:
conn.execute("""
UPDATE dances SET
choreographer = ?,
video_url = ?,
stepsheet_url = ?,
notes = ?
WHERE id = ?
""", (choreographer.strip(), video_url.strip(),
stepsheet_url.strip(), notes.strip(), dance_id))
def get_or_create_dance(name: str, level_id: int | None,
conn=None) -> int:
"""Find eller opret en dans (name + level_id kombination).
Returnerer dance_id. conn er valgfri — bruges ved nested kald."""
name = name.strip()
close = False
if conn is None:
conn = new_conn()
close = True
try:
existing = conn.execute(
"SELECT id FROM dances WHERE name=? COLLATE NOCASE AND level_id IS ?",
(name, level_id)
).fetchone()
if existing:
conn.execute(
"UPDATE dances SET use_count=use_count+1 WHERE id=?",
(existing["id"],)
)
return existing["id"]
conn.execute(
"INSERT INTO dances (name, level_id, use_count, source) VALUES (?,?,1,'local')",
(name, level_id)
)
return conn.execute(
"SELECT id FROM dances WHERE name=? COLLATE NOCASE AND level_id IS ?",
(name, level_id)
).fetchone()["id"]
finally:
if close:
conn.commit()
conn.close()
def get_dance_suggestions(prefix: str, limit: int = 20) -> list[dict]:
"""Returnerer danse der starter med prefix som {id, name, level_id, level_name}.
Sorteret efter popularitet — bruges til autoudfyld."""
with get_db() as conn:
rows = conn.execute("""
SELECT d.id, d.name, d.level_id, d.use_count,
dl.name as level_name, dl.sort_order
FROM dances d
LEFT JOIN dance_levels dl ON dl.id = d.level_id
WHERE d.name LIKE ? COLLATE NOCASE
ORDER BY d.use_count DESC, dl.sort_order, d.name
LIMIT ?
""", (f"{prefix}%", limit)).fetchall()
return [dict(r) for r in rows]
def get_dances_for_song(song_id: str) -> list[dict]:
"""Hent hoveddanse for en sang med niveau-info og workshop-flag."""
with get_db() as conn:
rows = conn.execute("""
SELECT d.id as dance_id, d.name, d.level_id,
dl.name as level_name, sd.dance_order,
sd.id as song_dance_id, sd.is_workshop
FROM song_dances sd
JOIN dances d ON d.id = sd.dance_id
LEFT JOIN dance_levels dl ON dl.id = d.level_id
WHERE sd.song_id=? ORDER BY sd.dance_order
""", (song_id,)).fetchall()
return [dict(r) for r in rows]
def get_alt_dances_for_song(song_id: str) -> list[dict]:
"""Hent alternativ-danse for en sang med niveau-info."""
with get_db() as conn:
rows = conn.execute("""
SELECT d.id as dance_id, d.name, d.level_id,
dl.name as level_name, sad.note, sad.source, sad.id as alt_id
FROM song_alt_dances sad
JOIN dances d ON d.id = sad.dance_id
LEFT JOIN dance_levels dl ON dl.id = d.level_id
WHERE sad.song_id=? ORDER BY d.name
""", (song_id,)).fetchall()
return [dict(r) for r in rows]
# ── Dans-niveauer ─────────────────────────────────────────────────────────────
def get_dance_levels() -> list[sqlite3.Row]:
"""Hent alle niveauer sorteret efter sort_order."""
with get_db() as conn:
return conn.execute(
"SELECT * FROM dance_levels ORDER BY sort_order"
).fetchall()
def sync_dance_levels_from_api(levels: list[dict]):
"""Synkroniser niveauer fra API."""
from datetime import datetime, timezone
now = datetime.now(timezone.utc).isoformat()
with get_db() as conn:
for lvl in levels:
conn.execute("""
INSERT INTO dance_levels (sort_order, name, description, synced_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(name) DO UPDATE SET
sort_order = excluded.sort_order,
description = excluded.description,
synced_at = excluded.synced_at
""", (lvl["sort_order"], lvl["name"], lvl.get("description", ""), now))
def sync_dances_from_api(dances: list[dict]):
"""Synkroniser danse fra API — {name, level_id, use_count}."""
from datetime import datetime, timezone
now = datetime.now(timezone.utc).isoformat()
with get_db() as conn:
for d in dances:
conn.execute("""
INSERT INTO dances (name, level_id, use_count, source, synced_at)
VALUES (?, ?, ?, 'community', ?)
ON CONFLICT(name, level_id) DO UPDATE SET
use_count = MAX(use_count, excluded.use_count),
synced_at = excluded.synced_at
""", (d["name"], d.get("level_id"), d.get("use_count", 1), now))
# Backwards compat alias
def get_dance_name_suggestions(prefix: str, limit: int = 20) -> list[str]:
"""Returnerer dans-navne som strings — bruges af AutoLineEdit."""
suggestions = get_dance_suggestions(prefix, limit)
result = []
for s in suggestions:
if s.get("level_name"):
result.append(f"{s['name']} / {s['level_name']}")
else:
result.append(s["name"])
return result