This commit is contained in:
2026-04-11 00:35:05 +02:00
parent 78a2cf79cd
commit 181cb28a86
3594 changed files with 0 additions and 406408 deletions

View File

@@ -1,29 +0,0 @@
"""
local/ — Lokalt data-lag til Linedance-afspilleren.
Moduler:
local_db.py — SQLite database (sange, afspilningslister, biblioteker)
tag_reader.py — Læser/skriver metadata fra lydfiler
file_watcher.py — Overvåger mapper og holder SQLite opdateret
Typisk brug ved app-start:
from local.local_db import init_db
from local.file_watcher import get_watcher
# Initialiser database
init_db()
# Start fil-overvågning (on_change kaldes ved ændringer — opdater GUI)
def on_file_change(event_type, path, song_id):
print(f"{event_type}: {path}")
watcher = get_watcher(on_change=on_file_change)
watcher.start()
# Tilføj et bibliotek (scanner automatisk + starter overvågning)
watcher.add_library("/home/carsten/Musik")
# Ved app-luk:
watcher.stop()
"""

View File

@@ -1,274 +0,0 @@
"""
file_watcher.py — Overvåger musikbiblioteker og holder SQLite opdateret.
Bruger watchdog til at reagere på fil-ændringer i realtid.
Kører fuld scan ved opstart for at fange ændringer lavet mens appen var lukket.
"""
import threading
import time
import logging
from pathlib import Path
from typing import Callable
try:
from watchdog.observers import Observer
from watchdog.events import (
FileSystemEventHandler,
FileCreatedEvent,
FileModifiedEvent,
FileDeletedEvent,
FileMovedEvent,
)
WATCHDOG_AVAILABLE = True
except ImportError:
WATCHDOG_AVAILABLE = False
print("Advarsel: watchdog ikke installeret — fil-overvågning deaktiveret")
from local.tag_reader import is_supported, read_tags, get_file_modified_at
from local.local_db import (
get_libraries, add_library, remove_library,
upsert_song, mark_song_missing,
get_all_song_paths_for_library, update_library_scan_time,
)
logger = logging.getLogger(__name__)
class MusicLibraryHandler(FileSystemEventHandler):
"""
Reagerer på ændringer i et musikbibliotek.
Kører i watchdog's baggrundstråd — DB-operationer er thread-safe via WAL.
"""
def __init__(self, library_id: int, on_change: Callable | None = None):
self.library_id = library_id
self.on_change = on_change # valgfrit callback til GUI-opdatering
self._debounce: dict[str, float] = {}
self._debounce_lock = threading.Lock()
def _debounced(self, path: str) -> bool:
"""
Forhindrer at samme fil behandles flere gange på kort tid.
Nogle programmer gemmer filer i flere trin (temp-fil → rename).
"""
now = time.time()
with self._debounce_lock:
last = self._debounce.get(path, 0)
if now - last < 1.5: # 1.5 sekunder cooldown
return False
self._debounce[path] = now
return True
def on_created(self, event):
if event.is_directory or not is_supported(event.src_path):
return
if self._debounced(event.src_path):
self._process_file(event.src_path)
def on_modified(self, event):
if event.is_directory or not is_supported(event.src_path):
return
if self._debounced(event.src_path):
self._process_file(event.src_path)
def on_deleted(self, event):
if event.is_directory or not is_supported(event.src_path):
return
logger.info(f"Fil slettet: {event.src_path}")
mark_song_missing(event.src_path)
if self.on_change:
self.on_change("deleted", event.src_path, None)
def on_moved(self, event):
if event.is_directory:
return
# Behandl som slet + opret
if is_supported(event.src_path):
mark_song_missing(event.src_path)
if is_supported(event.dest_path):
if self._debounced(event.dest_path):
self._process_file(event.dest_path)
def _process_file(self, path: str):
"""Læs tags og gem i SQLite."""
try:
logger.debug(f"Høster tags fra: {path}")
tags = read_tags(path)
tags["library_id"] = self.library_id
song_id = upsert_song(tags)
logger.info(f"Opdateret: {Path(path).name} ({len(tags.get('dances', []))} danse)")
if self.on_change:
self.on_change("upserted", path, song_id)
except Exception as e:
logger.error(f"Fejl ved behandling af {path}: {e}")
class LibraryWatcher:
"""
Styrer watchdog-observere for alle aktive musikbiblioteker.
Én instans per applikation.
"""
def __init__(self, on_change: Callable | None = None):
self.on_change = on_change
self._observer: Observer | None = None
self._running = False
def start(self):
"""Start overvågning af alle aktive biblioteker + kør fuld scan."""
if not WATCHDOG_AVAILABLE:
logger.warning("watchdog ikke tilgængelig — starter kun fuld scan")
self._full_scan_all()
return
self._observer = Observer()
libraries = get_libraries(active_only=True)
for lib in libraries:
path = Path(lib["path"])
if not path.exists():
logger.warning(f"Bibliotek findes ikke: {path}")
continue
handler = MusicLibraryHandler(lib["id"], self.on_change)
self._observer.schedule(handler, str(path), recursive=True)
logger.info(f"Overvåger: {path}")
self._observer.start()
self._running = True
# Fuld scan i baggrundstråd så GUI ikke blokeres
threading.Thread(target=self._full_scan_all, daemon=True).start()
def stop(self):
if self._observer and self._running:
self._observer.stop()
self._observer.join()
self._running = False
def add_library(self, path: str) -> int:
"""Tilføj et nyt bibliotek og start overvågning af det med det samme."""
library_id = add_library(path)
if self._observer and self._running:
handler = MusicLibraryHandler(library_id, self.on_change)
self._observer.schedule(handler, path, recursive=True)
logger.info(f"Tilføjet bibliotek: {path}")
# Scan det nye bibliotek i baggrunden
threading.Thread(
target=self._full_scan_library,
args=(library_id, path),
daemon=True,
).start()
return library_id
def remove_library(self, library_id: int):
"""Deaktiver bibliotek. Watchdog stopper automatisk ved næste restart."""
remove_library(library_id)
# Genstart observer for at fjerne watch (watchdog understøtter ikke unschedule by id)
if self._observer and self._running:
self._observer.unschedule_all()
self._reschedule_all()
def _reschedule_all(self):
"""Genplanlæg alle aktive biblioteker på observeren."""
for lib in get_libraries(active_only=True):
path = Path(lib["path"])
if path.exists():
handler = MusicLibraryHandler(lib["id"], self.on_change)
self._observer.schedule(handler, str(path), recursive=True)
def _full_scan_all(self):
"""Kør fuld scan på alle aktive biblioteker."""
for lib in get_libraries(active_only=True):
path = Path(lib["path"])
if path.exists():
self._full_scan_library(lib["id"], str(path))
def _full_scan_library(self, library_id: int, library_path: str):
"""
Sammenligner filer på disk med SQLite og synkroniserer forskelle.
Håndterer utilgængelige mapper og symlinks sikkert.
"""
logger.info(f"Fuld scan starter: {library_path}")
base = Path(library_path)
# Tjek at mappen faktisk er tilgængelig — med timeout
if not self._path_accessible(base):
logger.warning(f"Bibliotek ikke tilgængeligt (timeout eller ingen adgang): {library_path}")
return
known = get_all_song_paths_for_library(library_id)
found_paths = set()
processed = 0
errors = 0
import os
for dirpath, dirnames, filenames in os.walk(
str(base), followlinks=False,
onerror=lambda e: logger.warning(f"Adgang nægtet: {e}")
):
for filename in filenames:
file_path = Path(dirpath) / filename
try:
if not is_supported(file_path):
continue
path_str = str(file_path)
found_paths.add(path_str)
disk_modified = get_file_modified_at(file_path)
if path_str not in known or known[path_str] != disk_modified:
tags = read_tags(file_path)
tags["library_id"] = library_id
upsert_song(tags)
processed += 1
if self.on_change:
self.on_change("upserted", path_str, None)
except Exception as e:
logger.error(f"Scan-fejl for {file_path}: {e}")
errors += 1
# Marker forsvundne filer
missing_count = 0
for known_path in known:
if known_path not in found_paths:
mark_song_missing(known_path)
missing_count += 1
if self.on_change:
self.on_change("deleted", known_path, None)
update_library_scan_time(library_id)
logger.info(
f"Scan færdig: {library_path}"
f"{processed} opdateret, {missing_count} mangler, {errors} fejl"
)
def _path_accessible(self, path: Path, timeout_sec: float = 5.0) -> bool:
"""Tjek om en sti er tilgængelig inden for timeout."""
import threading
result = [False]
def check():
try:
result[0] = path.exists() and path.is_dir()
except Exception:
result[0] = False
t = threading.Thread(target=check, daemon=True)
t.start()
t.join(timeout=timeout_sec)
return result[0]
# ── Singleton til brug i appen ────────────────────────────────────────────────
_watcher: LibraryWatcher | None = None
def get_watcher(on_change: Callable | None = None) -> LibraryWatcher:
"""Returnerer den globale LibraryWatcher-instans."""
global _watcher
if _watcher is None:
_watcher = LibraryWatcher(on_change=on_change)
return _watcher

View File

@@ -1,666 +0,0 @@
"""
local_db.py — Lokal SQLite database til offline brug.
Håndterer:
- Musikbiblioteker (stier der overvåges)
- Sange høstet fra filsystemet
- Lokale afspilningslister (offline-projekter)
- Synkroniseringsstatus mod API
"""
import sqlite3
import threading
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
DB_PATH = Path.home() / ".linedance" / "local.db"
_local = threading.local()
_global_conn: sqlite3.Connection | None = None
def _get_conn() -> sqlite3.Connection:
"""Returnerer en global forbindelse i autocommit mode."""
global _global_conn
if _global_conn is None:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
_global_conn = sqlite3.connect(str(DB_PATH), check_same_thread=False,
isolation_level=None) # autocommit
_global_conn.row_factory = sqlite3.Row
_global_conn.execute("PRAGMA journal_mode=WAL")
_global_conn.execute("PRAGMA foreign_keys=ON")
return _global_conn
def new_conn() -> sqlite3.Connection:
"""Åbn en frisk forbindelse til brug i tag_editor og dialogs."""
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys=OFF") # FK checker forhindrer level_id gem
return conn
@contextmanager
def get_db():
"""Context manager der bruger app-forbindelsen i autocommit mode.
Hver statement committer med det samme — ingen eksplicit transaktion."""
conn = _get_conn()
try:
yield conn
except Exception:
raise
def get_db_raw() -> sqlite3.Connection:
return _get_conn()
def init_db():
"""Opret alle tabeller hvis de ikke findes."""
conn = _get_conn()
# executescript committer automatisk og nulstiller isolation_level
# Kør det direkte på den underliggende connection
conn.executescript("""
CREATE TABLE IF NOT EXISTS libraries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
path TEXT NOT NULL UNIQUE,
is_active INTEGER NOT NULL DEFAULT 1,
last_full_scan TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS songs (
id TEXT PRIMARY KEY,
library_id INTEGER REFERENCES libraries(id),
local_path TEXT NOT NULL UNIQUE,
title TEXT NOT NULL DEFAULT '',
artist TEXT NOT NULL DEFAULT '',
album TEXT NOT NULL DEFAULT '',
bpm INTEGER NOT NULL DEFAULT 0,
duration_sec INTEGER NOT NULL DEFAULT 0,
file_format TEXT NOT NULL DEFAULT '',
file_modified_at TEXT NOT NULL,
file_missing INTEGER NOT NULL DEFAULT 0,
extra_tags TEXT NOT NULL DEFAULT '{}',
api_song_id TEXT,
last_synced_at TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS dance_levels (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sort_order INTEGER NOT NULL,
name TEXT NOT NULL UNIQUE,
description TEXT NOT NULL DEFAULT '',
synced_at TEXT
);
CREATE TABLE IF NOT EXISTS song_dances (
id INTEGER PRIMARY KEY AUTOINCREMENT,
song_id TEXT NOT NULL REFERENCES songs(id) ON DELETE CASCADE,
dance_name TEXT NOT NULL,
dance_order INTEGER NOT NULL DEFAULT 1,
level_id INTEGER REFERENCES dance_levels(id)
);
CREATE TABLE IF NOT EXISTS dance_alternatives (
id TEXT PRIMARY KEY,
song_dance_id INTEGER NOT NULL REFERENCES song_dances(id) ON DELETE CASCADE,
alt_dance_name TEXT NOT NULL DEFAULT '',
level_id INTEGER REFERENCES dance_levels(id),
note TEXT NOT NULL DEFAULT '',
source TEXT NOT NULL DEFAULT 'local',
created_by TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS dance_names (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE COLLATE NOCASE,
source TEXT NOT NULL DEFAULT 'local',
use_count INTEGER NOT NULL DEFAULT 1,
synced_at TEXT
);
CREATE TABLE IF NOT EXISTS playlists (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT NOT NULL DEFAULT '',
api_project_id TEXT,
last_synced_at TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS playlist_songs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
playlist_id INTEGER NOT NULL REFERENCES playlists(id) ON DELETE CASCADE,
song_id TEXT NOT NULL REFERENCES songs(id),
position INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
UNIQUE(playlist_id, position)
);
CREATE TABLE IF NOT EXISTS sync_queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
entity_type TEXT NOT NULL,
entity_id TEXT NOT NULL,
action TEXT NOT NULL,
payload TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS event_state (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_songs_title ON songs(title);
CREATE INDEX IF NOT EXISTS idx_songs_artist ON songs(artist);
CREATE INDEX IF NOT EXISTS idx_songs_missing ON songs(file_missing);
CREATE INDEX IF NOT EXISTS idx_songs_library ON songs(library_id);
CREATE INDEX IF NOT EXISTS idx_song_dances ON song_dances(song_id);
""")
# executescript slår foreign_keys fra — genaktiver
conn.execute("PRAGMA foreign_keys=ON")
# Tilføj db_version tabel hvis den ikke findes
conn.execute("""
CREATE TABLE IF NOT EXISTS db_version (
version INTEGER PRIMARY KEY
)
""")
# Kør versionsbaserede migrationer
_run_versioned_migrations(conn)
# Seed standard-niveauer
count = conn.execute("SELECT COUNT(*) FROM dance_levels").fetchone()[0]
if count == 0:
defaults = [
(1, "Begynder", "Passer til alle"),
(2, "Let øvet", "Lidt erfaring kræves"),
(3, "Øvet", "Kræver regelmæssig træning"),
(4, "Erfaren", "For dedikerede dansere"),
(5, "Ekspert", "Konkurrenceniveau"),
]
for row in defaults:
conn.execute(
"INSERT OR IGNORE INTO dance_levels (sort_order, name, description) VALUES (?,?,?)",
row
)
# ── Versionsbaserede migrationer ──────────────────────────────────────────────
# Tilføj aldrig gamle — tilføj kun nye versioner nederst.
MIGRATIONS: dict[int, list[str]] = {
1: [
"ALTER TABLE songs ADD COLUMN extra_tags TEXT NOT NULL DEFAULT '{}'",
"ALTER TABLE song_dances ADD COLUMN level_id INTEGER REFERENCES dance_levels(id)",
"ALTER TABLE dance_alternatives ADD COLUMN alt_dance_name TEXT NOT NULL DEFAULT ''",
"ALTER TABLE dance_alternatives ADD COLUMN level_id INTEGER REFERENCES dance_levels(id)",
"ALTER TABLE dance_alternatives ADD COLUMN source TEXT NOT NULL DEFAULT 'local'",
"ALTER TABLE dance_alternatives ADD COLUMN created_by TEXT NOT NULL DEFAULT ''",
],
# Eksempel på fremtidig migration:
# 2: ["ALTER TABLE songs ADD COLUMN mbid TEXT"],
}
def _run_versioned_migrations(conn):
"""Kør kun migrationer der ikke allerede er kørt vha. db_version tabel."""
row = conn.execute("SELECT version FROM db_version").fetchone()
current_version = row["version"] if row else 0
for version in sorted(MIGRATIONS.keys()):
if version <= current_version:
continue
for sql in MIGRATIONS[version]:
try:
conn.execute(sql)
except Exception:
pass # kolonnen eksisterer allerede
conn.execute(
"INSERT OR REPLACE INTO db_version (version) VALUES (?)", (version,)
)
# ── Biblioteker ───────────────────────────────────────────────────────────────
def add_library(path: str) -> int:
with get_db() as conn:
cur = conn.execute(
"INSERT OR IGNORE INTO libraries (path) VALUES (?)", (path,)
)
if cur.lastrowid:
return cur.lastrowid
row = conn.execute("SELECT id FROM libraries WHERE path=?", (path,)).fetchone()
return row["id"]
def get_libraries(active_only: bool = True) -> list[sqlite3.Row]:
with get_db() as conn:
if active_only:
return conn.execute(
"SELECT * FROM libraries WHERE is_active=1 ORDER BY path"
).fetchall()
return conn.execute("SELECT * FROM libraries ORDER BY path").fetchall()
def remove_library(library_id: int):
with get_db() as conn:
# Marker sange som manglende
conn.execute(
"UPDATE songs SET file_missing=1 WHERE library_id=?", (library_id,)
)
# Slet biblioteket helt
conn.execute("DELETE FROM libraries WHERE id=?", (library_id,))
def update_library_scan_time(library_id: int):
now = datetime.now(timezone.utc).isoformat()
with get_db() as conn:
conn.execute(
"UPDATE libraries SET last_full_scan=? WHERE id=?", (now, library_id)
)
# ── Sange ─────────────────────────────────────────────────────────────────────
def upsert_song(song_data: dict) -> str:
"""
Indsæt eller opdater en sang baseret på local_path.
Returnerer song_id.
"""
import uuid, json
with get_db() as conn:
existing = conn.execute(
"SELECT id FROM songs WHERE local_path=?", (song_data["local_path"],)
).fetchone()
extra_tags_json = json.dumps(song_data.get("extra_tags", {}), ensure_ascii=False)
if existing:
song_id = existing["id"]
conn.execute("""
UPDATE songs SET
library_id=?, title=?, artist=?, album=?, bpm=?, duration_sec=?,
file_format=?, file_modified_at=?, file_missing=0, extra_tags=?
WHERE id=?
""", (
song_data.get("library_id"),
song_data.get("title", ""),
song_data.get("artist", ""),
song_data.get("album", ""),
song_data.get("bpm", 0),
song_data.get("duration_sec", 0),
song_data.get("file_format", ""),
song_data.get("file_modified_at", ""),
extra_tags_json,
song_id,
))
else:
song_id = str(uuid.uuid4())
conn.execute("""
INSERT INTO songs
(id, library_id, local_path, title, artist, album,
bpm, duration_sec, file_format, file_modified_at, extra_tags)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
""", (
song_id,
song_data.get("library_id"),
song_data["local_path"],
song_data.get("title", ""),
song_data.get("artist", ""),
song_data.get("album", ""),
song_data.get("bpm", 0),
song_data.get("duration_sec", 0),
song_data.get("file_format", ""),
song_data.get("file_modified_at", ""),
extra_tags_json,
))
# Opdater danse hvis de er med i data — bevar level_id og alternativer
if "dances" in song_data:
file_dances = []
for dance in song_data["dances"]:
if isinstance(dance, dict):
file_dances.append(dance.get("name", ""))
else:
file_dances.append(dance)
file_dances = [d for d in file_dances if d]
# Hent eksisterende danse med level_id og alternativer
existing = conn.execute(
"SELECT id, dance_name, dance_order, level_id FROM song_dances "
"WHERE song_id=? ORDER BY dance_order",
(song_id,)
).fetchall()
existing_map = {r["dance_name"].lower(): r for r in existing}
# Slet danse der ikke længere er i filen
file_lower = [d.lower() for d in file_dances]
for row in existing:
if row["dance_name"].lower() not in file_lower:
conn.execute(
"DELETE FROM dance_alternatives WHERE song_dance_id=?", (row["id"],)
)
conn.execute("DELETE FROM song_dances WHERE id=?", (row["id"],))
# Tilføj eller opdater danse fra filen
for i, name in enumerate(file_dances, start=1):
ex = existing_map.get(name.lower())
if ex:
# Bevar level_id — opdater kun dance_order
conn.execute(
"UPDATE song_dances SET dance_order=? WHERE id=?",
(i, ex["id"])
)
else:
# Ny dans — ingen level_id endnu
conn.execute(
"INSERT INTO song_dances (song_id, dance_name, dance_order, level_id) "
"VALUES (?,?,?,NULL)",
(song_id, name, i)
)
return song_id
def mark_song_missing(local_path: str):
with get_db() as conn:
conn.execute(
"UPDATE songs SET file_missing=1 WHERE local_path=?", (local_path,)
)
def get_song_by_path(local_path: str) -> sqlite3.Row | None:
with get_db() as conn:
return conn.execute(
"SELECT * FROM songs WHERE local_path=?", (local_path,)
).fetchone()
def search_songs(query: str, limit: int = 50) -> list[sqlite3.Row]:
"""Søg i alle tags — titel, artist, album, danse og alle øvrige tags."""
pattern = f"%{query}%"
with get_db() as conn:
return conn.execute("""
SELECT DISTINCT s.* FROM songs s
LEFT JOIN song_dances sd ON sd.song_id = s.id
WHERE s.file_missing = 0
AND (
s.title LIKE ? OR
s.artist LIKE ? OR
s.album LIKE ? OR
sd.dance_name LIKE ? OR
s.extra_tags LIKE ?
)
ORDER BY s.artist, s.title
LIMIT ?
""", (pattern, pattern, pattern, pattern, pattern, limit)).fetchall()
def get_songs_for_library(library_id: int) -> list[sqlite3.Row]:
with get_db() as conn:
return conn.execute(
"SELECT * FROM songs WHERE library_id=? ORDER BY artist, title",
(library_id,)
).fetchall()
def get_all_song_paths_for_library(library_id: int) -> dict[str, str]:
"""Returnerer {local_path: file_modified_at} — bruges til fuld scan."""
with get_db() as conn:
rows = conn.execute(
"SELECT local_path, file_modified_at FROM songs WHERE library_id=?",
(library_id,)
).fetchall()
return {row["local_path"]: row["file_modified_at"] for row in rows}
# ── Afspilningslister ─────────────────────────────────────────────────────────
def create_playlist(name: str, description: str = "") -> int:
with get_db() as conn:
cur = conn.execute(
"INSERT INTO playlists (name, description) VALUES (?,?)",
(name, description)
)
return cur.lastrowid
def get_playlists() -> list[sqlite3.Row]:
with get_db() as conn:
return conn.execute(
"SELECT * FROM playlists ORDER BY created_at DESC"
).fetchall()
def add_song_to_playlist(playlist_id: int, song_id: str, position: int | None = None) -> int:
with get_db() as conn:
if position is None:
row = conn.execute(
"SELECT MAX(position) as max_pos FROM playlist_songs WHERE playlist_id=?",
(playlist_id,)
).fetchone()
position = (row["max_pos"] or 0) + 1
cur = conn.execute(
"INSERT INTO playlist_songs (playlist_id, song_id, position) VALUES (?,?,?)",
(playlist_id, song_id, position)
)
return cur.lastrowid
def update_playlist_song_status(playlist_song_id: int, status: str):
valid = {"pending", "playing", "played", "skipped"}
if status not in valid:
raise ValueError(f"Ugyldig status: {status}")
with get_db() as conn:
conn.execute(
"UPDATE playlist_songs SET status=? WHERE id=?",
(status, playlist_song_id)
)
def get_playlist_with_songs(playlist_id: int) -> dict:
with get_db() as conn:
playlist = conn.execute(
"SELECT * FROM playlists WHERE id=?", (playlist_id,)
).fetchone()
if not playlist:
return {}
songs = conn.execute("""
SELECT ps.id as ps_id, ps.position, ps.status,
s.*, GROUP_CONCAT(sd.dance_name ORDER BY sd.dance_order) as dances
FROM playlist_songs ps
JOIN songs s ON s.id = ps.song_id
LEFT JOIN song_dances sd ON sd.song_id = s.id
WHERE ps.playlist_id = ?
GROUP BY ps.id
ORDER BY ps.position
""", (playlist_id,)).fetchall()
return {"playlist": dict(playlist), "songs": [dict(s) for s in songs]}
# ── Event-state (gemmes løbende så man kan genstarte efter strømsvigt) ────────
def save_event_state(current_idx: int, statuses: list[str]):
"""Gem event-fremgang — overskrives ved hver ændring."""
import json
with get_db() as conn:
conn.execute("INSERT OR REPLACE INTO event_state (key,value) VALUES ('current_idx',?)",
(str(current_idx),))
conn.execute("INSERT OR REPLACE INTO event_state (key,value) VALUES ('statuses',?)",
(json.dumps(statuses),))
def load_event_state() -> tuple[int, list[str]] | None:
"""Indlæs gemt event-fremgang. Returnerer None hvis ingen gemt tilstand."""
import json
with get_db() as conn:
idx_row = conn.execute(
"SELECT value FROM event_state WHERE key='current_idx'"
).fetchone()
sta_row = conn.execute(
"SELECT value FROM event_state WHERE key='statuses'"
).fetchone()
if not idx_row or not sta_row:
return None
return int(idx_row["value"]), json.loads(sta_row["value"])
def clear_event_state():
"""Nulstil gemt event-tilstand (bruges ved 'Start event')."""
with get_db() as conn:
conn.execute("DELETE FROM event_state")
# ── Dans-navne ordbog ─────────────────────────────────────────────────────────
def get_dance_name_suggestions(prefix: str, limit: int = 20) -> list[str]:
"""Returnerer dans-navne der starter med prefix fra alle kendte sources,
sorteret efter popularitet. Inkluderer navne fra song_dances og dance_alternatives."""
with get_db() as conn:
# Hent fra dance_names ordbog (primær kilde)
rows = conn.execute("""
SELECT name, use_count FROM dance_names
WHERE name LIKE ? COLLATE NOCASE
ORDER BY use_count DESC, name
LIMIT ?
""", (f"{prefix}%", limit)).fetchall()
names = {r["name"]: r["use_count"] for r in rows}
# Supplér med navne direkte fra song_dances der ikke er i ordbogen
extra = conn.execute("""
SELECT DISTINCT dance_name as name FROM song_dances
WHERE dance_name LIKE ? COLLATE NOCASE
LIMIT ?
""", (f"{prefix}%", limit)).fetchall()
for r in extra:
if r["name"] not in names:
names[r["name"]] = 0
# Supplér med alternativ-danse
extra2 = conn.execute("""
SELECT DISTINCT alt_dance_name as name FROM dance_alternatives
WHERE alt_dance_name LIKE ? COLLATE NOCASE
LIMIT ?
""", (f"{prefix}%", limit)).fetchall()
for r in extra2:
if r["name"] not in names:
names[r["name"]] = 0
# Sorter: kendte navne med høj use_count først, derefter alfabetisk
return sorted(names.keys(),
key=lambda n: (-names[n], n.lower()))[:limit]
def register_dance_name(name: str, source: str = "local"):
"""Tilføj eller opdater et dans-navn i ordbogen."""
name = name.strip()
if not name:
return
with get_db() as conn:
existing = conn.execute(
"SELECT id, use_count FROM dance_names WHERE name=? COLLATE NOCASE",
(name,)
).fetchone()
if existing:
conn.execute(
"UPDATE dance_names SET use_count=use_count+1 WHERE id=?",
(existing["id"],)
)
else:
conn.execute(
"INSERT INTO dance_names (name, source, use_count) VALUES (?,?,1)",
(name, source)
)
def sync_dance_names_from_api(names: list[dict]):
"""Synkroniser dans-navne fra API — {name, use_count}."""
from datetime import datetime, timezone
now = datetime.now(timezone.utc).isoformat()
with get_db() as conn:
for item in names:
conn.execute("""
INSERT INTO dance_names (name, source, use_count, synced_at)
VALUES (?, 'community', ?, ?)
ON CONFLICT(name) DO UPDATE SET
use_count = MAX(use_count, excluded.use_count),
synced_at = excluded.synced_at
""", (item["name"], item.get("use_count", 1), now))
# ── Dans-niveauer ─────────────────────────────────────────────────────────────
def get_dance_levels() -> list[sqlite3.Row]:
"""Hent alle niveauer sorteret efter sort_order."""
with get_db() as conn:
return conn.execute(
"SELECT * FROM dance_levels ORDER BY sort_order"
).fetchall()
def sync_dance_levels_from_api(levels: list[dict]):
"""Synkroniser niveauer fra API — {sort_order, name, description}."""
from datetime import datetime, timezone
now = datetime.now(timezone.utc).isoformat()
with get_db() as conn:
for lvl in levels:
conn.execute("""
INSERT INTO dance_levels (sort_order, name, description, synced_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(name) DO UPDATE SET
sort_order = excluded.sort_order,
description = excluded.description,
synced_at = excluded.synced_at
""", (lvl["sort_order"], lvl["name"], lvl.get("description", ""), now))
# ── Dans-alternativer ─────────────────────────────────────────────────────────
def get_alternatives_for_dance(song_dance_id: int) -> list[sqlite3.Row]:
with get_db() as conn:
return conn.execute("""
SELECT da.*, dl.name as level_name, dl.sort_order as level_sort
FROM dance_alternatives da
LEFT JOIN dance_levels dl ON dl.id = da.level_id
WHERE da.song_dance_id = ?
ORDER BY da.source, dl.sort_order
""", (song_dance_id,)).fetchall()
def add_alternative(song_dance_id: int, alt_dance_name: str,
level_id: int | None = None, note: str = "",
source: str = "local", created_by: str = "") -> str:
import uuid as _uuid
alt_id = str(_uuid.uuid4())
with get_db() as conn:
conn.execute("""
INSERT INTO dance_alternatives
(id, song_dance_id, alt_dance_name, level_id, note, source, created_by)
VALUES (?,?,?,?,?,?,?)
""", (alt_id, song_dance_id, alt_dance_name.strip(),
level_id, note, source, created_by))
# Registrer alt-dans-navne i ordbogen
register_dance_name(alt_dance_name, source=source)
return alt_id
def remove_alternative(alt_id: str):
with get_db() as conn:
conn.execute("DELETE FROM dance_alternatives WHERE id=?", (alt_id,))

View File

@@ -1,391 +0,0 @@
"""
tag_reader.py — Læser og skriver metadata fra lydfiler.
Understøttede formater og danse-tag support:
MP3 — læs + skriv danse (ID3 TXXX-felter)
FLAC — læs + skriv danse (Vorbis Comments)
OGG — læs + skriv danse (Vorbis Comments)
OPUS — læs + skriv danse (Vorbis Comments)
M4A — læs + skriv danse (MP4 custom felt ----:LINEDANCE:DANCE)
WAV — læs metadata, ingen danse-tag support
WMA — læs metadata, ingen danse-tag support
AIFF — læs metadata, ingen danse-tag support
Danse gemmes ALTID i SQLite uanset format.
Fil-skrivning er kun muligt for de formater der understøtter custom tags.
"""
import os
from datetime import datetime, timezone
from pathlib import Path
try:
from mutagen import File as MutagenFile
from mutagen.id3 import ID3, TXXX
from mutagen.flac import FLAC
from mutagen.mp4 import MP4, MP4FreeForm
MUTAGEN_AVAILABLE = True
except ImportError:
MUTAGEN_AVAILABLE = False
print("Advarsel: mutagen ikke installeret — tag-læsning deaktiveret")
# Filtyper vi høster metadata fra
SUPPORTED_EXTENSIONS = {
".mp3", ".flac", ".wav", ".m4a", ".aac",
".ogg", ".opus", ".wma", ".aiff", ".aif",
}
# Formater der understøtter skrivning af danse-tags til fil
WRITABLE_DANCE_FORMATS = {".mp3", ".flac", ".ogg", ".opus", ".m4a"}
# Tag-nøgler brugt på tværs af formater
TXXX_DANCE_PREFIX = "LINEDANCE_DANCE_" # MP3: TXXX:LINEDANCE_DANCE_1
VORBIS_DANCE_KEY = "linedance_dance" # FLAC/OGG: linedance_dance.1
M4A_DANCE_FREEFORM = "----:LINEDANCE:DANCE" # M4A: ----:LINEDANCE:DANCE (liste)
def is_supported(path: str | Path) -> bool:
return Path(path).suffix.lower() in SUPPORTED_EXTENSIONS
def can_write_dances(path: str | Path) -> bool:
"""Returnerer True hvis formatet understøtter skrivning af danse-tags til fil."""
return Path(path).suffix.lower() in WRITABLE_DANCE_FORMATS
def get_file_modified_at(path: str | Path) -> str:
ts = os.path.getmtime(str(path))
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
# ── Læsning ───────────────────────────────────────────────────────────────────
def read_tags(path: str | Path) -> dict:
"""
Læser metadata og danse fra en lydfil.
Returnerer dict med: title, artist, album, bpm, duration_sec,
file_format, file_modified_at, dances, can_write_dances,
extra_tags (dict med alle øvrige tags som {navn: værdi}).
"""
path = Path(path)
result = {
"local_path": str(path),
"title": path.stem,
"artist": "",
"album": "",
"bpm": 0,
"duration_sec": 0,
"file_format": path.suffix.lower().lstrip("."),
"file_modified_at": get_file_modified_at(path),
"dances": [],
"can_write_dances": can_write_dances(path),
"extra_tags": {},
}
if not MUTAGEN_AVAILABLE:
return result
try:
audio = MutagenFile(str(path), easy=False)
if audio is None:
return result
if hasattr(audio, "info") and audio.info:
result["duration_sec"] = int(getattr(audio.info, "length", 0))
ext = path.suffix.lower()
if ext == ".mp3":
_read_mp3(audio, result)
elif ext == ".flac":
_read_vorbis(audio, result)
elif ext in (".ogg", ".opus"):
_read_vorbis(audio, result)
elif ext in (".m4a", ".aac", ".mp4"):
_read_m4a(audio, result)
else:
_read_generic(audio, result)
except Exception as e:
print(f"Fejl ved læsning af {path}: {e}")
return result
def _read_mp3(audio, result: dict):
tags = audio.tags
if not tags:
return
if "TIT2" in tags:
result["title"] = str(tags["TIT2"].text[0])
if "TPE1" in tags:
result["artist"] = str(tags["TPE1"].text[0])
if "TALB" in tags:
result["album"] = str(tags["TALB"].text[0])
if "TBPM" in tags:
try:
result["bpm"] = int(float(str(tags["TBPM"].text[0])))
except (ValueError, TypeError):
pass
dances = {}
extra = {}
# Kendte ID3-felt-navne til menneskelige navne
ID3_NAMES = {
"TIT2": "titel", "TPE1": "artist", "TALB": "album", "TBPM": "bpm",
"TYER": "år", "TDRC": "dato", "TCON": "genre", "TPE2": "albumartist",
"TPOS": "disknummer", "TRCK": "spornummer", "TCOM": "komponist",
"TLYR": "sangtekst", "TCOP": "copyright", "TPUB": "udgiver",
"TENC": "kodet_af", "TLAN": "sprog", "TMOO": "stemning",
"TPE3": "dirigent", "TPE4": "fortolket_af", "TOAL": "original_album",
"TOPE": "original_artist", "TORY": "original_år",
}
for key, frame in tags.items():
if key.startswith("TXXX:") and TXXX_DANCE_PREFIX in key:
try:
num = int(key.replace(f"TXXX:{TXXX_DANCE_PREFIX}", ""))
dances[num] = str(frame.text[0])
except (ValueError, IndexError):
pass
elif key.startswith("TXXX:"):
# Custom TXXX-felt — gem under dets beskrivelse
desc = key[5:] # fjern "TXXX:"
try:
extra[desc] = str(frame.text[0])
except Exception:
pass
elif key in ID3_NAMES and key not in ("TIT2","TPE1","TALB","TBPM"):
# Standardfelt vi ikke allerede har gemt
try:
val = str(frame.text[0]) if hasattr(frame, "text") else str(frame)
if val:
extra[ID3_NAMES[key]] = val
except Exception:
pass
elif hasattr(frame, "text") and key not in ("TIT2","TPE1","TALB","TBPM"):
# Alle andre tekstfelter
try:
val = str(frame.text[0])
if val and not key.startswith("APIC"): # spring albumcover over
extra[key] = val
except Exception:
pass
result["dances"] = [dances[k] for k in sorted(dances.keys())]
result["extra_tags"] = extra
def _read_vorbis(audio, result: dict):
"""FLAC og OGG/Opus bruger begge Vorbis Comments."""
tags = audio.tags
if not tags:
return
result["title"] = tags.get("title", [result["title"]])[0]
result["artist"] = tags.get("artist", [""])[0]
result["album"] = tags.get("album", [""])[0]
try:
result["bpm"] = int(tags.get("bpm", [0])[0])
except (ValueError, TypeError):
pass
# Danse
dances = {}
for key, values in tags.items():
if key.lower().startswith(f"{VORBIS_DANCE_KEY}."):
try:
num = int(key.split(".")[-1])
dances[num] = values[0]
except (ValueError, IndexError):
pass
if not dances and VORBIS_DANCE_KEY in tags:
result["dances"] = [d.strip() for d in tags[VORBIS_DANCE_KEY][0].split(",") if d.strip()]
else:
result["dances"] = [dances[k] for k in sorted(dances.keys())]
# Alle øvrige tags som extra_tags
skip = {"title", "artist", "album", "bpm", VORBIS_DANCE_KEY}
extra = {}
for key, values in tags.items():
k = key.lower()
if k not in skip and not k.startswith(VORBIS_DANCE_KEY):
try:
extra[k] = str(values[0])
except Exception:
pass
result["extra_tags"] = extra
def _read_m4a(audio, result: dict):
tags = audio.tags
if not tags:
return
if "\xa9nam" in tags:
result["title"] = str(tags["\xa9nam"][0])
if "\xa9ART" in tags:
result["artist"] = str(tags["\xa9ART"][0])
if "\xa9alb" in tags:
result["album"] = str(tags["\xa9alb"][0])
if "tmpo" in tags:
try:
result["bpm"] = int(tags["tmpo"][0])
except (ValueError, TypeError):
pass
if M4A_DANCE_FREEFORM in tags:
result["dances"] = [
v.decode("utf-8") if isinstance(v, (bytes, MP4FreeForm)) else str(v)
for v in tags[M4A_DANCE_FREEFORM]
]
# Menneskelige navne til M4A-nøgler
M4A_NAMES = {
"\xa9nam": "titel", "\xa9ART": "artist", "\xa9alb": "album",
"\xa9day": "år", "\xa9gen": "genre", "\xa9wrt": "komponist",
"\xa9cmt": "kommentar", "aART": "albumartist", "trkn": "spornummer",
"disk": "disknummer", "cprt": "copyright", "\xa9lyr": "sangtekst",
"tmpo": "bpm",
}
skip_keys = {"\xa9nam", "\xa9ART", "\xa9alb", "tmpo", M4A_DANCE_FREEFORM, "covr"}
extra = {}
for key, values in tags.items():
if key in skip_keys:
continue
label = M4A_NAMES.get(key, key)
try:
val = values[0]
if isinstance(val, (bytes, MP4FreeForm)):
val = val.decode("utf-8", errors="replace")
extra[label] = str(val)
except Exception:
pass
result["extra_tags"] = extra
def _read_generic(audio, result: dict):
try:
easy = MutagenFile(result["local_path"], easy=True)
if easy and easy.tags:
result["title"] = easy.tags.get("title", [result["title"]])[0]
result["artist"] = easy.tags.get("artist", [""])[0]
result["album"] = easy.tags.get("album", [""])[0]
except Exception:
pass
# ── Skrivning ─────────────────────────────────────────────────────────────────
def write_dances(path: str | Path, dances: list[str]) -> bool:
"""
Skriver danse til filen hvis formatet understøtter det.
Returnerer True ved succes, False hvis formatet ikke understøtter det.
Kaster Exception ved fejl under skrivning.
"""
if not MUTAGEN_AVAILABLE:
return False
path = Path(path)
ext = path.suffix.lower()
if ext not in WRITABLE_DANCE_FORMATS:
return False
if ext == ".mp3":
return _write_mp3_dances(path, dances)
elif ext in (".flac", ".ogg", ".opus"):
return _write_vorbis_dances(path, dances)
elif ext in (".m4a", ".aac"):
return _write_m4a_dances(path, dances)
return False
def _write_mp3_dances(path: Path, dances: list[str]) -> bool:
try:
tags = ID3(str(path))
for key in [k for k in tags.keys() if TXXX_DANCE_PREFIX in k]:
del tags[key]
for i, name in enumerate(dances, start=1):
tags.add(TXXX(encoding=3, desc=f"{TXXX_DANCE_PREFIX}{i}", text=name))
tags.save(str(path))
return True
except Exception as e:
print(f"MP3 skrivefejl {path}: {e}")
return False
def _write_vorbis_dances(path: Path, dances: list[str]) -> bool:
try:
audio = MutagenFile(str(path), easy=False)
if audio is None or audio.tags is None:
return False
# Slet eksisterende danse-felter
keys_to_delete = [k for k in audio.tags.keys() if k.lower().startswith(f"{VORBIS_DANCE_KEY}.")]
for key in keys_to_delete:
del audio.tags[key]
# Skriv nye — ét felt per dans
for i, name in enumerate(dances, start=1):
audio.tags[f"{VORBIS_DANCE_KEY}.{i}"] = name
audio.save()
return True
except Exception as e:
print(f"Vorbis skrivefejl {path}: {e}")
return False
def _write_m4a_dances(path: Path, dances: list[str]) -> bool:
try:
audio = MP4(str(path))
audio.tags[M4A_DANCE_FREEFORM] = [
MP4FreeForm(name.encode("utf-8")) for name in dances
]
audio.save()
return True
except Exception as e:
print(f"M4A skrivefejl {path}: {e}")
return False
# ── Hurtig læsning af kun danse (uden fuld tag-scan) ─────────────────────────
def read_dances_from_file(path: str | Path) -> list[str]:
"""Læser kun danse fra en fil — hurtigere end fuld read_tags()."""
tags = read_tags(path)
return tags.get("dances", [])
# ── BPM-analyse ───────────────────────────────────────────────────────────────
def analyze_bpm(path: str | Path) -> float | None:
"""
Analysér BPM fra lydfilen ved hjælp af librosa.
Returnerer BPM som float eller None ved fejl.
Tager 2-5 sekunder per sang — kør i baggrundstråd.
"""
try:
import librosa
# Indlæs kun de første 60 sekunder for hastighed
y, sr = librosa.load(str(path), duration=60.0, mono=True)
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
# librosa returnerer array i nyere versioner
if hasattr(tempo, "__len__"):
bpm = float(tempo[0]) if len(tempo) > 0 else 0.0
else:
bpm = float(tempo)
return round(bpm, 1) if bpm > 0 else None
except ImportError:
print("librosa ikke installeret — installer med: pip install librosa")
return None
except Exception as e:
print(f"BPM-analyse fejl for {path}: {e}")
return None
def analyze_and_save_bpm(path: str | Path, song_id: str) -> float | None:
"""Analysér BPM og gem i SQLite. Returnerer målt BPM."""
bpm = analyze_bpm(path)
if bpm and bpm > 0:
try:
from local.local_db import get_db
with get_db() as conn:
conn.execute(
"UPDATE songs SET bpm=? WHERE id=? AND (bpm IS NULL OR bpm=0)",
(int(round(bpm)), song_id)
)
except Exception as e:
print(f"BPM gem fejl: {e}")
return bpm