Start
This commit is contained in:
29
linedance-app/local/__init__.py
Normal file
29
linedance-app/local/__init__.py
Normal file
@@ -0,0 +1,29 @@
|
||||
"""
|
||||
local/ — Lokalt data-lag til Linedance-afspilleren.
|
||||
|
||||
Moduler:
|
||||
local_db.py — SQLite database (sange, afspilningslister, biblioteker)
|
||||
tag_reader.py — Læser/skriver metadata fra lydfiler
|
||||
file_watcher.py — Overvåger mapper og holder SQLite opdateret
|
||||
|
||||
Typisk brug ved app-start:
|
||||
|
||||
from local.local_db import init_db
|
||||
from local.file_watcher import get_watcher
|
||||
|
||||
# Initialiser database
|
||||
init_db()
|
||||
|
||||
# Start fil-overvågning (on_change kaldes ved ændringer — opdater GUI)
|
||||
def on_file_change(event_type, path, song_id):
|
||||
print(f"{event_type}: {path}")
|
||||
|
||||
watcher = get_watcher(on_change=on_file_change)
|
||||
watcher.start()
|
||||
|
||||
# Tilføj et bibliotek (scanner automatisk + starter overvågning)
|
||||
watcher.add_library("/home/carsten/Musik")
|
||||
|
||||
# Ved app-luk:
|
||||
watcher.stop()
|
||||
"""
|
||||
274
linedance-app/local/file_watcher.py
Normal file
274
linedance-app/local/file_watcher.py
Normal file
@@ -0,0 +1,274 @@
|
||||
"""
|
||||
file_watcher.py — Overvåger musikbiblioteker og holder SQLite opdateret.
|
||||
|
||||
Bruger watchdog til at reagere på fil-ændringer i realtid.
|
||||
Kører fuld scan ved opstart for at fange ændringer lavet mens appen var lukket.
|
||||
"""
|
||||
|
||||
import threading
|
||||
import time
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import Callable
|
||||
|
||||
try:
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.events import (
|
||||
FileSystemEventHandler,
|
||||
FileCreatedEvent,
|
||||
FileModifiedEvent,
|
||||
FileDeletedEvent,
|
||||
FileMovedEvent,
|
||||
)
|
||||
WATCHDOG_AVAILABLE = True
|
||||
except ImportError:
|
||||
WATCHDOG_AVAILABLE = False
|
||||
print("Advarsel: watchdog ikke installeret — fil-overvågning deaktiveret")
|
||||
|
||||
from local.tag_reader import is_supported, read_tags, get_file_modified_at
|
||||
from local.local_db import (
|
||||
get_libraries, add_library, remove_library,
|
||||
upsert_song, mark_song_missing,
|
||||
get_all_song_paths_for_library, update_library_scan_time,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MusicLibraryHandler(FileSystemEventHandler):
|
||||
"""
|
||||
Reagerer på ændringer i et musikbibliotek.
|
||||
Kører i watchdog's baggrundstråd — DB-operationer er thread-safe via WAL.
|
||||
"""
|
||||
|
||||
def __init__(self, library_id: int, on_change: Callable | None = None):
|
||||
self.library_id = library_id
|
||||
self.on_change = on_change # valgfrit callback til GUI-opdatering
|
||||
self._debounce: dict[str, float] = {}
|
||||
self._debounce_lock = threading.Lock()
|
||||
|
||||
def _debounced(self, path: str) -> bool:
|
||||
"""
|
||||
Forhindrer at samme fil behandles flere gange på kort tid.
|
||||
Nogle programmer gemmer filer i flere trin (temp-fil → rename).
|
||||
"""
|
||||
now = time.time()
|
||||
with self._debounce_lock:
|
||||
last = self._debounce.get(path, 0)
|
||||
if now - last < 1.5: # 1.5 sekunder cooldown
|
||||
return False
|
||||
self._debounce[path] = now
|
||||
return True
|
||||
|
||||
def on_created(self, event):
|
||||
if event.is_directory or not is_supported(event.src_path):
|
||||
return
|
||||
if self._debounced(event.src_path):
|
||||
self._process_file(event.src_path)
|
||||
|
||||
def on_modified(self, event):
|
||||
if event.is_directory or not is_supported(event.src_path):
|
||||
return
|
||||
if self._debounced(event.src_path):
|
||||
self._process_file(event.src_path)
|
||||
|
||||
def on_deleted(self, event):
|
||||
if event.is_directory or not is_supported(event.src_path):
|
||||
return
|
||||
logger.info(f"Fil slettet: {event.src_path}")
|
||||
mark_song_missing(event.src_path)
|
||||
if self.on_change:
|
||||
self.on_change("deleted", event.src_path, None)
|
||||
|
||||
def on_moved(self, event):
|
||||
if event.is_directory:
|
||||
return
|
||||
# Behandl som slet + opret
|
||||
if is_supported(event.src_path):
|
||||
mark_song_missing(event.src_path)
|
||||
if is_supported(event.dest_path):
|
||||
if self._debounced(event.dest_path):
|
||||
self._process_file(event.dest_path)
|
||||
|
||||
def _process_file(self, path: str):
|
||||
"""Læs tags og gem i SQLite."""
|
||||
try:
|
||||
logger.debug(f"Høster tags fra: {path}")
|
||||
tags = read_tags(path)
|
||||
tags["library_id"] = self.library_id
|
||||
song_id = upsert_song(tags)
|
||||
logger.info(f"Opdateret: {Path(path).name} ({len(tags.get('dances', []))} danse)")
|
||||
if self.on_change:
|
||||
self.on_change("upserted", path, song_id)
|
||||
except Exception as e:
|
||||
logger.error(f"Fejl ved behandling af {path}: {e}")
|
||||
|
||||
|
||||
class LibraryWatcher:
|
||||
"""
|
||||
Styrer watchdog-observere for alle aktive musikbiblioteker.
|
||||
Én instans per applikation.
|
||||
"""
|
||||
|
||||
def __init__(self, on_change: Callable | None = None):
|
||||
self.on_change = on_change
|
||||
self._observer: Observer | None = None
|
||||
self._running = False
|
||||
|
||||
def start(self):
|
||||
"""Start overvågning af alle aktive biblioteker + kør fuld scan."""
|
||||
if not WATCHDOG_AVAILABLE:
|
||||
logger.warning("watchdog ikke tilgængelig — starter kun fuld scan")
|
||||
self._full_scan_all()
|
||||
return
|
||||
|
||||
self._observer = Observer()
|
||||
libraries = get_libraries(active_only=True)
|
||||
|
||||
for lib in libraries:
|
||||
path = Path(lib["path"])
|
||||
if not path.exists():
|
||||
logger.warning(f"Bibliotek findes ikke: {path}")
|
||||
continue
|
||||
|
||||
handler = MusicLibraryHandler(lib["id"], self.on_change)
|
||||
self._observer.schedule(handler, str(path), recursive=True)
|
||||
logger.info(f"Overvåger: {path}")
|
||||
|
||||
self._observer.start()
|
||||
self._running = True
|
||||
|
||||
# Fuld scan i baggrundstråd så GUI ikke blokeres
|
||||
threading.Thread(target=self._full_scan_all, daemon=True).start()
|
||||
|
||||
def stop(self):
|
||||
if self._observer and self._running:
|
||||
self._observer.stop()
|
||||
self._observer.join()
|
||||
self._running = False
|
||||
|
||||
def add_library(self, path: str) -> int:
|
||||
"""Tilføj et nyt bibliotek og start overvågning af det med det samme."""
|
||||
library_id = add_library(path)
|
||||
|
||||
if self._observer and self._running:
|
||||
handler = MusicLibraryHandler(library_id, self.on_change)
|
||||
self._observer.schedule(handler, path, recursive=True)
|
||||
logger.info(f"Tilføjet bibliotek: {path}")
|
||||
|
||||
# Scan det nye bibliotek i baggrunden
|
||||
threading.Thread(
|
||||
target=self._full_scan_library,
|
||||
args=(library_id, path),
|
||||
daemon=True,
|
||||
).start()
|
||||
|
||||
return library_id
|
||||
|
||||
def remove_library(self, library_id: int):
|
||||
"""Deaktiver bibliotek. Watchdog stopper automatisk ved næste restart."""
|
||||
remove_library(library_id)
|
||||
# Genstart observer for at fjerne watch (watchdog understøtter ikke unschedule by id)
|
||||
if self._observer and self._running:
|
||||
self._observer.unschedule_all()
|
||||
self._reschedule_all()
|
||||
|
||||
def _reschedule_all(self):
|
||||
"""Genplanlæg alle aktive biblioteker på observeren."""
|
||||
for lib in get_libraries(active_only=True):
|
||||
path = Path(lib["path"])
|
||||
if path.exists():
|
||||
handler = MusicLibraryHandler(lib["id"], self.on_change)
|
||||
self._observer.schedule(handler, str(path), recursive=True)
|
||||
|
||||
def _full_scan_all(self):
|
||||
"""Kør fuld scan på alle aktive biblioteker."""
|
||||
for lib in get_libraries(active_only=True):
|
||||
path = Path(lib["path"])
|
||||
if path.exists():
|
||||
self._full_scan_library(lib["id"], str(path))
|
||||
|
||||
def _full_scan_library(self, library_id: int, library_path: str):
|
||||
"""
|
||||
Sammenligner filer på disk med SQLite og synkroniserer forskelle.
|
||||
Håndterer utilgængelige mapper og symlinks sikkert.
|
||||
"""
|
||||
logger.info(f"Fuld scan starter: {library_path}")
|
||||
base = Path(library_path)
|
||||
|
||||
# Tjek at mappen faktisk er tilgængelig — med timeout
|
||||
if not self._path_accessible(base):
|
||||
logger.warning(f"Bibliotek ikke tilgængeligt (timeout eller ingen adgang): {library_path}")
|
||||
return
|
||||
|
||||
known = get_all_song_paths_for_library(library_id)
|
||||
found_paths = set()
|
||||
processed = 0
|
||||
errors = 0
|
||||
|
||||
import os
|
||||
for dirpath, dirnames, filenames in os.walk(
|
||||
str(base), followlinks=False,
|
||||
onerror=lambda e: logger.warning(f"Adgang nægtet: {e}")
|
||||
):
|
||||
for filename in filenames:
|
||||
file_path = Path(dirpath) / filename
|
||||
try:
|
||||
if not is_supported(file_path):
|
||||
continue
|
||||
path_str = str(file_path)
|
||||
found_paths.add(path_str)
|
||||
disk_modified = get_file_modified_at(file_path)
|
||||
|
||||
if path_str not in known or known[path_str] != disk_modified:
|
||||
tags = read_tags(file_path)
|
||||
tags["library_id"] = library_id
|
||||
upsert_song(tags)
|
||||
processed += 1
|
||||
if self.on_change:
|
||||
self.on_change("upserted", path_str, None)
|
||||
except Exception as e:
|
||||
logger.error(f"Scan-fejl for {file_path}: {e}")
|
||||
errors += 1
|
||||
|
||||
# Marker forsvundne filer
|
||||
missing_count = 0
|
||||
for known_path in known:
|
||||
if known_path not in found_paths:
|
||||
mark_song_missing(known_path)
|
||||
missing_count += 1
|
||||
if self.on_change:
|
||||
self.on_change("deleted", known_path, None)
|
||||
|
||||
update_library_scan_time(library_id)
|
||||
logger.info(
|
||||
f"Scan færdig: {library_path} — "
|
||||
f"{processed} opdateret, {missing_count} mangler, {errors} fejl"
|
||||
)
|
||||
|
||||
def _path_accessible(self, path: Path, timeout_sec: float = 5.0) -> bool:
|
||||
"""Tjek om en sti er tilgængelig inden for timeout."""
|
||||
import threading
|
||||
result = [False]
|
||||
def check():
|
||||
try:
|
||||
result[0] = path.exists() and path.is_dir()
|
||||
except Exception:
|
||||
result[0] = False
|
||||
t = threading.Thread(target=check, daemon=True)
|
||||
t.start()
|
||||
t.join(timeout=timeout_sec)
|
||||
return result[0]
|
||||
|
||||
|
||||
# ── Singleton til brug i appen ────────────────────────────────────────────────
|
||||
|
||||
_watcher: LibraryWatcher | None = None
|
||||
|
||||
|
||||
def get_watcher(on_change: Callable | None = None) -> LibraryWatcher:
|
||||
"""Returnerer den globale LibraryWatcher-instans."""
|
||||
global _watcher
|
||||
if _watcher is None:
|
||||
_watcher = LibraryWatcher(on_change=on_change)
|
||||
return _watcher
|
||||
688
linedance-app/local/local_db.py
Normal file
688
linedance-app/local/local_db.py
Normal file
@@ -0,0 +1,688 @@
|
||||
"""
|
||||
local_db.py — Lokal SQLite database til offline brug.
|
||||
|
||||
Håndterer:
|
||||
- Musikbiblioteker (stier der overvåges)
|
||||
- Sange høstet fra filsystemet
|
||||
- Lokale afspilningslister (offline-projekter)
|
||||
- Synkroniseringsstatus mod API
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import threading
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
DB_PATH = Path.home() / ".linedance" / "local.db"
|
||||
|
||||
_local = threading.local()
|
||||
_global_conn: sqlite3.Connection | None = None
|
||||
|
||||
|
||||
def _get_conn() -> sqlite3.Connection:
|
||||
"""Returnerer en global forbindelse i autocommit mode."""
|
||||
global _global_conn
|
||||
if _global_conn is None:
|
||||
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||||
_global_conn = sqlite3.connect(str(DB_PATH), check_same_thread=False,
|
||||
isolation_level=None) # autocommit
|
||||
_global_conn.row_factory = sqlite3.Row
|
||||
_global_conn.execute("PRAGMA journal_mode=WAL")
|
||||
_global_conn.execute("PRAGMA foreign_keys=ON")
|
||||
return _global_conn
|
||||
|
||||
|
||||
def new_conn() -> sqlite3.Connection:
|
||||
"""Åbn en frisk forbindelse til brug i tag_editor og dialogs."""
|
||||
conn = sqlite3.connect(str(DB_PATH), check_same_thread=False)
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA foreign_keys=OFF") # FK checker forhindrer level_id gem
|
||||
return conn
|
||||
|
||||
|
||||
@contextmanager
|
||||
def get_db():
|
||||
"""Context manager der bruger app-forbindelsen i autocommit mode.
|
||||
Hver statement committer med det samme — ingen eksplicit transaktion."""
|
||||
conn = _get_conn()
|
||||
try:
|
||||
yield conn
|
||||
except Exception:
|
||||
raise
|
||||
|
||||
|
||||
def get_db_raw() -> sqlite3.Connection:
|
||||
return _get_conn()
|
||||
|
||||
|
||||
def init_db():
|
||||
"""Opret alle tabeller hvis de ikke findes."""
|
||||
conn = _get_conn()
|
||||
|
||||
# executescript committer automatisk og nulstiller isolation_level
|
||||
# Kør det direkte på den underliggende connection
|
||||
conn.executescript("""
|
||||
CREATE TABLE IF NOT EXISTS libraries (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
path TEXT NOT NULL UNIQUE,
|
||||
is_active INTEGER NOT NULL DEFAULT 1,
|
||||
last_full_scan TEXT,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS songs (
|
||||
id TEXT PRIMARY KEY,
|
||||
library_id INTEGER REFERENCES libraries(id),
|
||||
local_path TEXT NOT NULL UNIQUE,
|
||||
title TEXT NOT NULL DEFAULT '',
|
||||
artist TEXT NOT NULL DEFAULT '',
|
||||
album TEXT NOT NULL DEFAULT '',
|
||||
bpm INTEGER NOT NULL DEFAULT 0,
|
||||
duration_sec INTEGER NOT NULL DEFAULT 0,
|
||||
file_format TEXT NOT NULL DEFAULT '',
|
||||
file_modified_at TEXT NOT NULL,
|
||||
file_missing INTEGER NOT NULL DEFAULT 0,
|
||||
extra_tags TEXT NOT NULL DEFAULT '{}',
|
||||
api_song_id TEXT,
|
||||
last_synced_at TEXT,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS dance_levels (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
sort_order INTEGER NOT NULL,
|
||||
name TEXT NOT NULL UNIQUE,
|
||||
description TEXT NOT NULL DEFAULT '',
|
||||
synced_at TEXT
|
||||
);
|
||||
|
||||
-- Dans-entitet: navn + niveau er unik kombination
|
||||
CREATE TABLE IF NOT EXISTS dances (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL COLLATE NOCASE,
|
||||
level_id INTEGER REFERENCES dance_levels(id),
|
||||
use_count INTEGER NOT NULL DEFAULT 1,
|
||||
source TEXT NOT NULL DEFAULT 'local',
|
||||
synced_at TEXT,
|
||||
UNIQUE(name, level_id)
|
||||
);
|
||||
|
||||
-- Hoveddanse på en sang
|
||||
CREATE TABLE IF NOT EXISTS song_dances (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
song_id TEXT NOT NULL REFERENCES songs(id) ON DELETE CASCADE,
|
||||
dance_id INTEGER NOT NULL REFERENCES dances(id),
|
||||
dance_order INTEGER NOT NULL DEFAULT 1,
|
||||
UNIQUE(song_id, dance_id)
|
||||
);
|
||||
|
||||
-- Alternativ-danse på en sang
|
||||
CREATE TABLE IF NOT EXISTS song_alt_dances (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
song_id TEXT NOT NULL REFERENCES songs(id) ON DELETE CASCADE,
|
||||
dance_id INTEGER NOT NULL REFERENCES dances(id),
|
||||
note TEXT NOT NULL DEFAULT '',
|
||||
source TEXT NOT NULL DEFAULT 'local',
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
UNIQUE(song_id, dance_id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS playlists (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL,
|
||||
description TEXT NOT NULL DEFAULT '',
|
||||
api_project_id TEXT,
|
||||
last_synced_at TEXT,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS playlist_songs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
playlist_id INTEGER NOT NULL REFERENCES playlists(id) ON DELETE CASCADE,
|
||||
song_id TEXT NOT NULL REFERENCES songs(id),
|
||||
position INTEGER NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'pending',
|
||||
UNIQUE(playlist_id, position)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS sync_queue (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
entity_type TEXT NOT NULL,
|
||||
entity_id TEXT NOT NULL,
|
||||
action TEXT NOT NULL,
|
||||
payload TEXT NOT NULL,
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS event_state (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_songs_title ON songs(title);
|
||||
CREATE INDEX IF NOT EXISTS idx_songs_artist ON songs(artist);
|
||||
CREATE INDEX IF NOT EXISTS idx_songs_missing ON songs(file_missing);
|
||||
CREATE INDEX IF NOT EXISTS idx_songs_library ON songs(library_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_song_dances ON song_dances(song_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_song_alt_dances ON song_alt_dances(song_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_dances_name ON dances(name);
|
||||
""")
|
||||
|
||||
# executescript slår foreign_keys fra — genaktiver
|
||||
conn.execute("PRAGMA foreign_keys=ON")
|
||||
|
||||
# Tilføj db_version tabel hvis den ikke findes
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS db_version (
|
||||
version INTEGER PRIMARY KEY
|
||||
)
|
||||
""")
|
||||
|
||||
# Kør versionsbaserede migrationer
|
||||
_run_versioned_migrations(conn)
|
||||
|
||||
# Seed standard-niveauer
|
||||
count = conn.execute("SELECT COUNT(*) FROM dance_levels").fetchone()[0]
|
||||
if count == 0:
|
||||
defaults = [
|
||||
(1, "Begynder", "Passer til alle"),
|
||||
(2, "Let øvet", "Lidt erfaring kræves"),
|
||||
(3, "Øvet", "Kræver regelmæssig træning"),
|
||||
(4, "Erfaren", "For dedikerede dansere"),
|
||||
(5, "Ekspert", "Konkurrenceniveau"),
|
||||
]
|
||||
for row in defaults:
|
||||
conn.execute(
|
||||
"INSERT OR IGNORE INTO dance_levels (sort_order, name, description) VALUES (?,?,?)",
|
||||
row
|
||||
)
|
||||
|
||||
|
||||
# ── Versionsbaserede migrationer ──────────────────────────────────────────────
|
||||
# Tilføj aldrig gamle — tilføj kun nye versioner nederst.
|
||||
|
||||
MIGRATIONS: dict[int, list[str]] = {
|
||||
1: [
|
||||
"ALTER TABLE songs ADD COLUMN extra_tags TEXT NOT NULL DEFAULT '{}'",
|
||||
],
|
||||
2: [
|
||||
# Ny dans-entitet model
|
||||
"""CREATE TABLE IF NOT EXISTS dances (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL COLLATE NOCASE,
|
||||
level_id INTEGER REFERENCES dance_levels(id),
|
||||
use_count INTEGER NOT NULL DEFAULT 1,
|
||||
source TEXT NOT NULL DEFAULT 'local',
|
||||
synced_at TEXT,
|
||||
UNIQUE(name, level_id)
|
||||
)""",
|
||||
"""CREATE TABLE IF NOT EXISTS song_alt_dances (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
song_id TEXT NOT NULL REFERENCES songs(id) ON DELETE CASCADE,
|
||||
dance_id INTEGER NOT NULL REFERENCES dances(id),
|
||||
note TEXT NOT NULL DEFAULT '',
|
||||
source TEXT NOT NULL DEFAULT 'local',
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
UNIQUE(song_id, dance_id)
|
||||
)""",
|
||||
# Migrer eksisterende song_dances data til ny model
|
||||
# (kører kun på ældre databaser der har dance_name kolonnen)
|
||||
"""INSERT OR IGNORE INTO dances (name, level_id, source)
|
||||
SELECT DISTINCT dance_name, level_id, 'local'
|
||||
FROM song_dances WHERE dance_name IS NOT NULL AND dance_name != ''""",
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
def _run_versioned_migrations(conn):
|
||||
"""Kør kun migrationer der ikke allerede er kørt vha. db_version tabel."""
|
||||
row = conn.execute("SELECT version FROM db_version").fetchone()
|
||||
current_version = row["version"] if row else 0
|
||||
|
||||
for version in sorted(MIGRATIONS.keys()):
|
||||
if version <= current_version:
|
||||
continue
|
||||
for sql in MIGRATIONS[version]:
|
||||
try:
|
||||
conn.execute(sql)
|
||||
except Exception:
|
||||
pass # kolonnen eksisterer allerede
|
||||
conn.execute(
|
||||
"INSERT OR REPLACE INTO db_version (version) VALUES (?)", (version,)
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# ── Biblioteker ───────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def add_library(path: str) -> int:
|
||||
with get_db() as conn:
|
||||
cur = conn.execute(
|
||||
"INSERT OR IGNORE INTO libraries (path) VALUES (?)", (path,)
|
||||
)
|
||||
if cur.lastrowid:
|
||||
return cur.lastrowid
|
||||
row = conn.execute("SELECT id FROM libraries WHERE path=?", (path,)).fetchone()
|
||||
return row["id"]
|
||||
|
||||
|
||||
def get_libraries(active_only: bool = True) -> list[sqlite3.Row]:
|
||||
with get_db() as conn:
|
||||
if active_only:
|
||||
return conn.execute(
|
||||
"SELECT * FROM libraries WHERE is_active=1 ORDER BY path"
|
||||
).fetchall()
|
||||
return conn.execute("SELECT * FROM libraries ORDER BY path").fetchall()
|
||||
|
||||
|
||||
def remove_library(library_id: int):
|
||||
with get_db() as conn:
|
||||
# Marker sange som manglende
|
||||
conn.execute(
|
||||
"UPDATE songs SET file_missing=1 WHERE library_id=?", (library_id,)
|
||||
)
|
||||
# Slet biblioteket helt
|
||||
conn.execute("DELETE FROM libraries WHERE id=?", (library_id,))
|
||||
|
||||
|
||||
def update_library_scan_time(library_id: int):
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
with get_db() as conn:
|
||||
conn.execute(
|
||||
"UPDATE libraries SET last_full_scan=? WHERE id=?", (now, library_id)
|
||||
)
|
||||
|
||||
|
||||
# ── Sange ─────────────────────────────────────────────────────────────────────
|
||||
|
||||
def upsert_song(song_data: dict) -> str:
|
||||
"""
|
||||
Indsæt eller opdater en sang baseret på local_path.
|
||||
Returnerer song_id.
|
||||
"""
|
||||
import uuid, json
|
||||
with get_db() as conn:
|
||||
existing = conn.execute(
|
||||
"SELECT id FROM songs WHERE local_path=?", (song_data["local_path"],)
|
||||
).fetchone()
|
||||
|
||||
extra_tags_json = json.dumps(song_data.get("extra_tags", {}), ensure_ascii=False)
|
||||
|
||||
if existing:
|
||||
song_id = existing["id"]
|
||||
conn.execute("""
|
||||
UPDATE songs SET
|
||||
library_id=?, title=?, artist=?, album=?, bpm=?, duration_sec=?,
|
||||
file_format=?, file_modified_at=?, file_missing=0, extra_tags=?
|
||||
WHERE id=?
|
||||
""", (
|
||||
song_data.get("library_id"),
|
||||
song_data.get("title", ""),
|
||||
song_data.get("artist", ""),
|
||||
song_data.get("album", ""),
|
||||
song_data.get("bpm", 0),
|
||||
song_data.get("duration_sec", 0),
|
||||
song_data.get("file_format", ""),
|
||||
song_data.get("file_modified_at", ""),
|
||||
extra_tags_json,
|
||||
song_id,
|
||||
))
|
||||
else:
|
||||
song_id = str(uuid.uuid4())
|
||||
conn.execute("""
|
||||
INSERT INTO songs
|
||||
(id, library_id, local_path, title, artist, album,
|
||||
bpm, duration_sec, file_format, file_modified_at, extra_tags)
|
||||
VALUES (?,?,?,?,?,?,?,?,?,?,?)
|
||||
""", (
|
||||
song_id,
|
||||
song_data.get("library_id"),
|
||||
song_data["local_path"],
|
||||
song_data.get("title", ""),
|
||||
song_data.get("artist", ""),
|
||||
song_data.get("album", ""),
|
||||
song_data.get("bpm", 0),
|
||||
song_data.get("duration_sec", 0),
|
||||
song_data.get("file_format", ""),
|
||||
song_data.get("file_modified_at", ""),
|
||||
extra_tags_json,
|
||||
))
|
||||
|
||||
# Opdater danse hvis de er med i data — bevar eksisterende og merge
|
||||
if "dances" in song_data:
|
||||
file_dances = []
|
||||
for dance in song_data["dances"]:
|
||||
name = dance.get("name", dance) if isinstance(dance, dict) else dance
|
||||
if name:
|
||||
file_dances.append(name.strip())
|
||||
|
||||
# Find eksisterende song_dances via dances tabel
|
||||
existing = conn.execute("""
|
||||
SELECT sd.id, d.name, sd.dance_order, d.level_id, d.id as dance_id
|
||||
FROM song_dances sd
|
||||
JOIN dances d ON d.id = sd.dance_id
|
||||
WHERE sd.song_id=? ORDER BY sd.dance_order
|
||||
""", (song_id,)).fetchall()
|
||||
existing_map = {r["name"].lower(): r for r in existing}
|
||||
file_lower = [d.lower() for d in file_dances]
|
||||
|
||||
# Slet danse der ikke længere er i filen
|
||||
for row in existing:
|
||||
if row["name"].lower() not in file_lower:
|
||||
conn.execute("DELETE FROM song_dances WHERE id=?", (row["id"],))
|
||||
|
||||
# Tilføj eller opdater danse fra filen
|
||||
for i, name in enumerate(file_dances, start=1):
|
||||
ex = existing_map.get(name.lower())
|
||||
if ex:
|
||||
conn.execute(
|
||||
"UPDATE song_dances SET dance_order=? WHERE id=?",
|
||||
(i, ex["id"])
|
||||
)
|
||||
else:
|
||||
# Opret eller find dans (name + NULL level = ny dans uden niveau)
|
||||
dance_id = get_or_create_dance(name, None, conn)
|
||||
conn.execute(
|
||||
"INSERT OR IGNORE INTO song_dances (song_id, dance_id, dance_order) "
|
||||
"VALUES (?,?,?)",
|
||||
(song_id, dance_id, i)
|
||||
)
|
||||
|
||||
return song_id
|
||||
|
||||
|
||||
def mark_song_missing(local_path: str):
|
||||
with get_db() as conn:
|
||||
conn.execute(
|
||||
"UPDATE songs SET file_missing=1 WHERE local_path=?", (local_path,)
|
||||
)
|
||||
|
||||
|
||||
def get_song_by_path(local_path: str) -> sqlite3.Row | None:
|
||||
with get_db() as conn:
|
||||
return conn.execute(
|
||||
"SELECT * FROM songs WHERE local_path=?", (local_path,)
|
||||
).fetchone()
|
||||
|
||||
|
||||
def search_songs(query: str, limit: int = 50) -> list[sqlite3.Row]:
|
||||
"""Søg i alle tags — titel, artist, album, danse og alle øvrige tags."""
|
||||
pattern = f"%{query}%"
|
||||
with get_db() as conn:
|
||||
return conn.execute("""
|
||||
SELECT DISTINCT s.* FROM songs s
|
||||
LEFT JOIN song_dances sd ON sd.song_id = s.id
|
||||
LEFT JOIN dances d ON d.id = sd.dance_id
|
||||
WHERE s.file_missing = 0
|
||||
AND (
|
||||
s.title LIKE ? OR
|
||||
s.artist LIKE ? OR
|
||||
s.album LIKE ? OR
|
||||
d.name LIKE ? OR
|
||||
s.extra_tags LIKE ?
|
||||
)
|
||||
ORDER BY s.artist, s.title
|
||||
LIMIT ?
|
||||
""", (pattern, pattern, pattern, pattern, pattern, limit)).fetchall()
|
||||
|
||||
|
||||
def get_songs_for_library(library_id: int) -> list[sqlite3.Row]:
|
||||
with get_db() as conn:
|
||||
return conn.execute(
|
||||
"SELECT * FROM songs WHERE library_id=? ORDER BY artist, title",
|
||||
(library_id,)
|
||||
).fetchall()
|
||||
|
||||
|
||||
def get_all_song_paths_for_library(library_id: int) -> dict[str, str]:
|
||||
"""Returnerer {local_path: file_modified_at} — bruges til fuld scan."""
|
||||
with get_db() as conn:
|
||||
rows = conn.execute(
|
||||
"SELECT local_path, file_modified_at FROM songs WHERE library_id=?",
|
||||
(library_id,)
|
||||
).fetchall()
|
||||
return {row["local_path"]: row["file_modified_at"] for row in rows}
|
||||
|
||||
|
||||
# ── Afspilningslister ─────────────────────────────────────────────────────────
|
||||
|
||||
def create_playlist(name: str, description: str = "") -> int:
|
||||
with get_db() as conn:
|
||||
cur = conn.execute(
|
||||
"INSERT INTO playlists (name, description) VALUES (?,?)",
|
||||
(name, description)
|
||||
)
|
||||
return cur.lastrowid
|
||||
|
||||
|
||||
def get_playlists() -> list[sqlite3.Row]:
|
||||
with get_db() as conn:
|
||||
return conn.execute(
|
||||
"SELECT * FROM playlists ORDER BY created_at DESC"
|
||||
).fetchall()
|
||||
|
||||
|
||||
def add_song_to_playlist(playlist_id: int, song_id: str, position: int | None = None) -> int:
|
||||
with get_db() as conn:
|
||||
if position is None:
|
||||
row = conn.execute(
|
||||
"SELECT MAX(position) as max_pos FROM playlist_songs WHERE playlist_id=?",
|
||||
(playlist_id,)
|
||||
).fetchone()
|
||||
position = (row["max_pos"] or 0) + 1
|
||||
|
||||
cur = conn.execute(
|
||||
"INSERT INTO playlist_songs (playlist_id, song_id, position) VALUES (?,?,?)",
|
||||
(playlist_id, song_id, position)
|
||||
)
|
||||
return cur.lastrowid
|
||||
|
||||
|
||||
def update_playlist_song_status(playlist_song_id: int, status: str):
|
||||
valid = {"pending", "playing", "played", "skipped"}
|
||||
if status not in valid:
|
||||
raise ValueError(f"Ugyldig status: {status}")
|
||||
with get_db() as conn:
|
||||
conn.execute(
|
||||
"UPDATE playlist_songs SET status=? WHERE id=?",
|
||||
(status, playlist_song_id)
|
||||
)
|
||||
|
||||
|
||||
def get_playlist_with_songs(playlist_id: int) -> dict:
|
||||
with get_db() as conn:
|
||||
playlist = conn.execute(
|
||||
"SELECT * FROM playlists WHERE id=?", (playlist_id,)
|
||||
).fetchone()
|
||||
if not playlist:
|
||||
return {}
|
||||
|
||||
songs = conn.execute("""
|
||||
SELECT ps.id as ps_id, ps.position, ps.status,
|
||||
s.*, GROUP_CONCAT(sd.dance_name ORDER BY sd.dance_order) as dances
|
||||
FROM playlist_songs ps
|
||||
JOIN songs s ON s.id = ps.song_id
|
||||
LEFT JOIN song_dances sd ON sd.song_id = s.id
|
||||
WHERE ps.playlist_id = ?
|
||||
GROUP BY ps.id
|
||||
ORDER BY ps.position
|
||||
""", (playlist_id,)).fetchall()
|
||||
|
||||
return {"playlist": dict(playlist), "songs": [dict(s) for s in songs]}
|
||||
|
||||
|
||||
# ── Event-state (gemmes løbende så man kan genstarte efter strømsvigt) ────────
|
||||
|
||||
def save_event_state(current_idx: int, statuses: list[str]):
|
||||
"""Gem event-fremgang — overskrives ved hver ændring."""
|
||||
import json
|
||||
with get_db() as conn:
|
||||
conn.execute("INSERT OR REPLACE INTO event_state (key,value) VALUES ('current_idx',?)",
|
||||
(str(current_idx),))
|
||||
conn.execute("INSERT OR REPLACE INTO event_state (key,value) VALUES ('statuses',?)",
|
||||
(json.dumps(statuses),))
|
||||
|
||||
|
||||
def load_event_state() -> tuple[int, list[str]] | None:
|
||||
"""Indlæs gemt event-fremgang. Returnerer None hvis ingen gemt tilstand."""
|
||||
import json
|
||||
with get_db() as conn:
|
||||
idx_row = conn.execute(
|
||||
"SELECT value FROM event_state WHERE key='current_idx'"
|
||||
).fetchone()
|
||||
sta_row = conn.execute(
|
||||
"SELECT value FROM event_state WHERE key='statuses'"
|
||||
).fetchone()
|
||||
if not idx_row or not sta_row:
|
||||
return None
|
||||
return int(idx_row["value"]), json.loads(sta_row["value"])
|
||||
|
||||
|
||||
def clear_event_state():
|
||||
"""Nulstil gemt event-tilstand (bruges ved 'Start event')."""
|
||||
with get_db() as conn:
|
||||
conn.execute("DELETE FROM event_state")
|
||||
|
||||
|
||||
# ── Dans-navne ordbog ─────────────────────────────────────────────────────────
|
||||
|
||||
# ── Dans-entitet funktioner ───────────────────────────────────────────────────
|
||||
|
||||
def get_or_create_dance(name: str, level_id: int | None,
|
||||
conn=None) -> int:
|
||||
"""Find eller opret en dans (name + level_id kombination).
|
||||
Returnerer dance_id. conn er valgfri — bruges ved nested kald."""
|
||||
name = name.strip()
|
||||
close = False
|
||||
if conn is None:
|
||||
conn = new_conn()
|
||||
close = True
|
||||
try:
|
||||
existing = conn.execute(
|
||||
"SELECT id FROM dances WHERE name=? COLLATE NOCASE AND level_id IS ?",
|
||||
(name, level_id)
|
||||
).fetchone()
|
||||
if existing:
|
||||
conn.execute(
|
||||
"UPDATE dances SET use_count=use_count+1 WHERE id=?",
|
||||
(existing["id"],)
|
||||
)
|
||||
return existing["id"]
|
||||
conn.execute(
|
||||
"INSERT INTO dances (name, level_id, use_count, source) VALUES (?,?,1,'local')",
|
||||
(name, level_id)
|
||||
)
|
||||
return conn.execute(
|
||||
"SELECT id FROM dances WHERE name=? COLLATE NOCASE AND level_id IS ?",
|
||||
(name, level_id)
|
||||
).fetchone()["id"]
|
||||
finally:
|
||||
if close:
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
|
||||
def get_dance_suggestions(prefix: str, limit: int = 20) -> list[dict]:
|
||||
"""Returnerer danse der starter med prefix som {id, name, level_id, level_name}.
|
||||
Sorteret efter popularitet — bruges til autoudfyld."""
|
||||
with get_db() as conn:
|
||||
rows = conn.execute("""
|
||||
SELECT d.id, d.name, d.level_id, d.use_count,
|
||||
dl.name as level_name, dl.sort_order
|
||||
FROM dances d
|
||||
LEFT JOIN dance_levels dl ON dl.id = d.level_id
|
||||
WHERE d.name LIKE ? COLLATE NOCASE
|
||||
ORDER BY d.use_count DESC, dl.sort_order, d.name
|
||||
LIMIT ?
|
||||
""", (f"{prefix}%", limit)).fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
def get_dances_for_song(song_id: str) -> list[dict]:
|
||||
"""Hent hoveddanse for en sang med niveau-info."""
|
||||
with get_db() as conn:
|
||||
rows = conn.execute("""
|
||||
SELECT d.id as dance_id, d.name, d.level_id,
|
||||
dl.name as level_name, sd.dance_order, sd.id as song_dance_id
|
||||
FROM song_dances sd
|
||||
JOIN dances d ON d.id = sd.dance_id
|
||||
LEFT JOIN dance_levels dl ON dl.id = d.level_id
|
||||
WHERE sd.song_id=? ORDER BY sd.dance_order
|
||||
""", (song_id,)).fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
def get_alt_dances_for_song(song_id: str) -> list[dict]:
|
||||
"""Hent alternativ-danse for en sang med niveau-info."""
|
||||
with get_db() as conn:
|
||||
rows = conn.execute("""
|
||||
SELECT d.id as dance_id, d.name, d.level_id,
|
||||
dl.name as level_name, sad.note, sad.source, sad.id as alt_id
|
||||
FROM song_alt_dances sad
|
||||
JOIN dances d ON d.id = sad.dance_id
|
||||
LEFT JOIN dance_levels dl ON dl.id = d.level_id
|
||||
WHERE sad.song_id=? ORDER BY d.name
|
||||
""", (song_id,)).fetchall()
|
||||
return [dict(r) for r in rows]
|
||||
|
||||
|
||||
# ── Dans-niveauer ─────────────────────────────────────────────────────────────
|
||||
|
||||
def get_dance_levels() -> list[sqlite3.Row]:
|
||||
"""Hent alle niveauer sorteret efter sort_order."""
|
||||
with get_db() as conn:
|
||||
return conn.execute(
|
||||
"SELECT * FROM dance_levels ORDER BY sort_order"
|
||||
).fetchall()
|
||||
|
||||
|
||||
def sync_dance_levels_from_api(levels: list[dict]):
|
||||
"""Synkroniser niveauer fra API."""
|
||||
from datetime import datetime, timezone
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
with get_db() as conn:
|
||||
for lvl in levels:
|
||||
conn.execute("""
|
||||
INSERT INTO dance_levels (sort_order, name, description, synced_at)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(name) DO UPDATE SET
|
||||
sort_order = excluded.sort_order,
|
||||
description = excluded.description,
|
||||
synced_at = excluded.synced_at
|
||||
""", (lvl["sort_order"], lvl["name"], lvl.get("description", ""), now))
|
||||
|
||||
|
||||
def sync_dances_from_api(dances: list[dict]):
|
||||
"""Synkroniser danse fra API — {name, level_id, use_count}."""
|
||||
from datetime import datetime, timezone
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
with get_db() as conn:
|
||||
for d in dances:
|
||||
conn.execute("""
|
||||
INSERT INTO dances (name, level_id, use_count, source, synced_at)
|
||||
VALUES (?, ?, ?, 'community', ?)
|
||||
ON CONFLICT(name, level_id) DO UPDATE SET
|
||||
use_count = MAX(use_count, excluded.use_count),
|
||||
synced_at = excluded.synced_at
|
||||
""", (d["name"], d.get("level_id"), d.get("use_count", 1), now))
|
||||
|
||||
|
||||
# Backwards compat alias
|
||||
def get_dance_name_suggestions(prefix: str, limit: int = 20) -> list[str]:
|
||||
"""Returnerer dans-navne som strings — bruges af AutoLineEdit."""
|
||||
suggestions = get_dance_suggestions(prefix, limit)
|
||||
result = []
|
||||
for s in suggestions:
|
||||
if s.get("level_name"):
|
||||
result.append(f"{s['name']} / {s['level_name']}")
|
||||
else:
|
||||
result.append(s["name"])
|
||||
return result
|
||||
|
||||
|
||||
391
linedance-app/local/tag_reader.py
Normal file
391
linedance-app/local/tag_reader.py
Normal file
@@ -0,0 +1,391 @@
|
||||
"""
|
||||
tag_reader.py — Læser og skriver metadata fra lydfiler.
|
||||
|
||||
Understøttede formater og danse-tag support:
|
||||
MP3 — læs + skriv danse (ID3 TXXX-felter)
|
||||
FLAC — læs + skriv danse (Vorbis Comments)
|
||||
OGG — læs + skriv danse (Vorbis Comments)
|
||||
OPUS — læs + skriv danse (Vorbis Comments)
|
||||
M4A — læs + skriv danse (MP4 custom felt ----:LINEDANCE:DANCE)
|
||||
WAV — læs metadata, ingen danse-tag support
|
||||
WMA — læs metadata, ingen danse-tag support
|
||||
AIFF — læs metadata, ingen danse-tag support
|
||||
|
||||
Danse gemmes ALTID i SQLite uanset format.
|
||||
Fil-skrivning er kun muligt for de formater der understøtter custom tags.
|
||||
"""
|
||||
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
from mutagen import File as MutagenFile
|
||||
from mutagen.id3 import ID3, TXXX
|
||||
from mutagen.flac import FLAC
|
||||
from mutagen.mp4 import MP4, MP4FreeForm
|
||||
MUTAGEN_AVAILABLE = True
|
||||
except ImportError:
|
||||
MUTAGEN_AVAILABLE = False
|
||||
print("Advarsel: mutagen ikke installeret — tag-læsning deaktiveret")
|
||||
|
||||
|
||||
# Filtyper vi høster metadata fra
|
||||
SUPPORTED_EXTENSIONS = {
|
||||
".mp3", ".flac", ".wav", ".m4a", ".aac",
|
||||
".ogg", ".opus", ".wma", ".aiff", ".aif",
|
||||
}
|
||||
|
||||
# Formater der understøtter skrivning af danse-tags til fil
|
||||
WRITABLE_DANCE_FORMATS = {".mp3", ".flac", ".ogg", ".opus", ".m4a"}
|
||||
|
||||
# Tag-nøgler brugt på tværs af formater
|
||||
TXXX_DANCE_PREFIX = "LINEDANCE_DANCE_" # MP3: TXXX:LINEDANCE_DANCE_1
|
||||
VORBIS_DANCE_KEY = "linedance_dance" # FLAC/OGG: linedance_dance.1
|
||||
M4A_DANCE_FREEFORM = "----:LINEDANCE:DANCE" # M4A: ----:LINEDANCE:DANCE (liste)
|
||||
|
||||
|
||||
def is_supported(path: str | Path) -> bool:
|
||||
return Path(path).suffix.lower() in SUPPORTED_EXTENSIONS
|
||||
|
||||
|
||||
def can_write_dances(path: str | Path) -> bool:
|
||||
"""Returnerer True hvis formatet understøtter skrivning af danse-tags til fil."""
|
||||
return Path(path).suffix.lower() in WRITABLE_DANCE_FORMATS
|
||||
|
||||
|
||||
def get_file_modified_at(path: str | Path) -> str:
|
||||
ts = os.path.getmtime(str(path))
|
||||
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
|
||||
|
||||
|
||||
# ── Læsning ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def read_tags(path: str | Path) -> dict:
|
||||
"""
|
||||
Læser metadata og danse fra en lydfil.
|
||||
Returnerer dict med: title, artist, album, bpm, duration_sec,
|
||||
file_format, file_modified_at, dances, can_write_dances,
|
||||
extra_tags (dict med alle øvrige tags som {navn: værdi}).
|
||||
"""
|
||||
path = Path(path)
|
||||
result = {
|
||||
"local_path": str(path),
|
||||
"title": path.stem,
|
||||
"artist": "",
|
||||
"album": "",
|
||||
"bpm": 0,
|
||||
"duration_sec": 0,
|
||||
"file_format": path.suffix.lower().lstrip("."),
|
||||
"file_modified_at": get_file_modified_at(path),
|
||||
"dances": [],
|
||||
"can_write_dances": can_write_dances(path),
|
||||
"extra_tags": {},
|
||||
}
|
||||
|
||||
if not MUTAGEN_AVAILABLE:
|
||||
return result
|
||||
|
||||
try:
|
||||
audio = MutagenFile(str(path), easy=False)
|
||||
if audio is None:
|
||||
return result
|
||||
|
||||
if hasattr(audio, "info") and audio.info:
|
||||
result["duration_sec"] = int(getattr(audio.info, "length", 0))
|
||||
|
||||
ext = path.suffix.lower()
|
||||
|
||||
if ext == ".mp3":
|
||||
_read_mp3(audio, result)
|
||||
elif ext == ".flac":
|
||||
_read_vorbis(audio, result)
|
||||
elif ext in (".ogg", ".opus"):
|
||||
_read_vorbis(audio, result)
|
||||
elif ext in (".m4a", ".aac", ".mp4"):
|
||||
_read_m4a(audio, result)
|
||||
else:
|
||||
_read_generic(audio, result)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Fejl ved læsning af {path}: {e}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _read_mp3(audio, result: dict):
|
||||
tags = audio.tags
|
||||
if not tags:
|
||||
return
|
||||
if "TIT2" in tags:
|
||||
result["title"] = str(tags["TIT2"].text[0])
|
||||
if "TPE1" in tags:
|
||||
result["artist"] = str(tags["TPE1"].text[0])
|
||||
if "TALB" in tags:
|
||||
result["album"] = str(tags["TALB"].text[0])
|
||||
if "TBPM" in tags:
|
||||
try:
|
||||
result["bpm"] = int(float(str(tags["TBPM"].text[0])))
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
dances = {}
|
||||
extra = {}
|
||||
# Kendte ID3-felt-navne til menneskelige navne
|
||||
ID3_NAMES = {
|
||||
"TIT2": "titel", "TPE1": "artist", "TALB": "album", "TBPM": "bpm",
|
||||
"TYER": "år", "TDRC": "dato", "TCON": "genre", "TPE2": "albumartist",
|
||||
"TPOS": "disknummer", "TRCK": "spornummer", "TCOM": "komponist",
|
||||
"TLYR": "sangtekst", "TCOP": "copyright", "TPUB": "udgiver",
|
||||
"TENC": "kodet_af", "TLAN": "sprog", "TMOO": "stemning",
|
||||
"TPE3": "dirigent", "TPE4": "fortolket_af", "TOAL": "original_album",
|
||||
"TOPE": "original_artist", "TORY": "original_år",
|
||||
}
|
||||
for key, frame in tags.items():
|
||||
if key.startswith("TXXX:") and TXXX_DANCE_PREFIX in key:
|
||||
try:
|
||||
num = int(key.replace(f"TXXX:{TXXX_DANCE_PREFIX}", ""))
|
||||
dances[num] = str(frame.text[0])
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
elif key.startswith("TXXX:"):
|
||||
# Custom TXXX-felt — gem under dets beskrivelse
|
||||
desc = key[5:] # fjern "TXXX:"
|
||||
try:
|
||||
extra[desc] = str(frame.text[0])
|
||||
except Exception:
|
||||
pass
|
||||
elif key in ID3_NAMES and key not in ("TIT2","TPE1","TALB","TBPM"):
|
||||
# Standardfelt vi ikke allerede har gemt
|
||||
try:
|
||||
val = str(frame.text[0]) if hasattr(frame, "text") else str(frame)
|
||||
if val:
|
||||
extra[ID3_NAMES[key]] = val
|
||||
except Exception:
|
||||
pass
|
||||
elif hasattr(frame, "text") and key not in ("TIT2","TPE1","TALB","TBPM"):
|
||||
# Alle andre tekstfelter
|
||||
try:
|
||||
val = str(frame.text[0])
|
||||
if val and not key.startswith("APIC"): # spring albumcover over
|
||||
extra[key] = val
|
||||
except Exception:
|
||||
pass
|
||||
result["dances"] = [dances[k] for k in sorted(dances.keys())]
|
||||
result["extra_tags"] = extra
|
||||
|
||||
|
||||
def _read_vorbis(audio, result: dict):
|
||||
"""FLAC og OGG/Opus bruger begge Vorbis Comments."""
|
||||
tags = audio.tags
|
||||
if not tags:
|
||||
return
|
||||
result["title"] = tags.get("title", [result["title"]])[0]
|
||||
result["artist"] = tags.get("artist", [""])[0]
|
||||
result["album"] = tags.get("album", [""])[0]
|
||||
try:
|
||||
result["bpm"] = int(tags.get("bpm", [0])[0])
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
# Danse
|
||||
dances = {}
|
||||
for key, values in tags.items():
|
||||
if key.lower().startswith(f"{VORBIS_DANCE_KEY}."):
|
||||
try:
|
||||
num = int(key.split(".")[-1])
|
||||
dances[num] = values[0]
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
if not dances and VORBIS_DANCE_KEY in tags:
|
||||
result["dances"] = [d.strip() for d in tags[VORBIS_DANCE_KEY][0].split(",") if d.strip()]
|
||||
else:
|
||||
result["dances"] = [dances[k] for k in sorted(dances.keys())]
|
||||
# Alle øvrige tags som extra_tags
|
||||
skip = {"title", "artist", "album", "bpm", VORBIS_DANCE_KEY}
|
||||
extra = {}
|
||||
for key, values in tags.items():
|
||||
k = key.lower()
|
||||
if k not in skip and not k.startswith(VORBIS_DANCE_KEY):
|
||||
try:
|
||||
extra[k] = str(values[0])
|
||||
except Exception:
|
||||
pass
|
||||
result["extra_tags"] = extra
|
||||
|
||||
|
||||
def _read_m4a(audio, result: dict):
|
||||
tags = audio.tags
|
||||
if not tags:
|
||||
return
|
||||
if "\xa9nam" in tags:
|
||||
result["title"] = str(tags["\xa9nam"][0])
|
||||
if "\xa9ART" in tags:
|
||||
result["artist"] = str(tags["\xa9ART"][0])
|
||||
if "\xa9alb" in tags:
|
||||
result["album"] = str(tags["\xa9alb"][0])
|
||||
if "tmpo" in tags:
|
||||
try:
|
||||
result["bpm"] = int(tags["tmpo"][0])
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
if M4A_DANCE_FREEFORM in tags:
|
||||
result["dances"] = [
|
||||
v.decode("utf-8") if isinstance(v, (bytes, MP4FreeForm)) else str(v)
|
||||
for v in tags[M4A_DANCE_FREEFORM]
|
||||
]
|
||||
# Menneskelige navne til M4A-nøgler
|
||||
M4A_NAMES = {
|
||||
"\xa9nam": "titel", "\xa9ART": "artist", "\xa9alb": "album",
|
||||
"\xa9day": "år", "\xa9gen": "genre", "\xa9wrt": "komponist",
|
||||
"\xa9cmt": "kommentar", "aART": "albumartist", "trkn": "spornummer",
|
||||
"disk": "disknummer", "cprt": "copyright", "\xa9lyr": "sangtekst",
|
||||
"tmpo": "bpm",
|
||||
}
|
||||
skip_keys = {"\xa9nam", "\xa9ART", "\xa9alb", "tmpo", M4A_DANCE_FREEFORM, "covr"}
|
||||
extra = {}
|
||||
for key, values in tags.items():
|
||||
if key in skip_keys:
|
||||
continue
|
||||
label = M4A_NAMES.get(key, key)
|
||||
try:
|
||||
val = values[0]
|
||||
if isinstance(val, (bytes, MP4FreeForm)):
|
||||
val = val.decode("utf-8", errors="replace")
|
||||
extra[label] = str(val)
|
||||
except Exception:
|
||||
pass
|
||||
result["extra_tags"] = extra
|
||||
|
||||
|
||||
def _read_generic(audio, result: dict):
|
||||
try:
|
||||
easy = MutagenFile(result["local_path"], easy=True)
|
||||
if easy and easy.tags:
|
||||
result["title"] = easy.tags.get("title", [result["title"]])[0]
|
||||
result["artist"] = easy.tags.get("artist", [""])[0]
|
||||
result["album"] = easy.tags.get("album", [""])[0]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ── Skrivning ─────────────────────────────────────────────────────────────────
|
||||
|
||||
def write_dances(path: str | Path, dances: list[str]) -> bool:
|
||||
"""
|
||||
Skriver danse til filen hvis formatet understøtter det.
|
||||
Returnerer True ved succes, False hvis formatet ikke understøtter det.
|
||||
Kaster Exception ved fejl under skrivning.
|
||||
"""
|
||||
if not MUTAGEN_AVAILABLE:
|
||||
return False
|
||||
|
||||
path = Path(path)
|
||||
ext = path.suffix.lower()
|
||||
|
||||
if ext not in WRITABLE_DANCE_FORMATS:
|
||||
return False
|
||||
|
||||
if ext == ".mp3":
|
||||
return _write_mp3_dances(path, dances)
|
||||
elif ext in (".flac", ".ogg", ".opus"):
|
||||
return _write_vorbis_dances(path, dances)
|
||||
elif ext in (".m4a", ".aac"):
|
||||
return _write_m4a_dances(path, dances)
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _write_mp3_dances(path: Path, dances: list[str]) -> bool:
|
||||
try:
|
||||
tags = ID3(str(path))
|
||||
for key in [k for k in tags.keys() if TXXX_DANCE_PREFIX in k]:
|
||||
del tags[key]
|
||||
for i, name in enumerate(dances, start=1):
|
||||
tags.add(TXXX(encoding=3, desc=f"{TXXX_DANCE_PREFIX}{i}", text=name))
|
||||
tags.save(str(path))
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"MP3 skrivefejl {path}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def _write_vorbis_dances(path: Path, dances: list[str]) -> bool:
|
||||
try:
|
||||
audio = MutagenFile(str(path), easy=False)
|
||||
if audio is None or audio.tags is None:
|
||||
return False
|
||||
# Slet eksisterende danse-felter
|
||||
keys_to_delete = [k for k in audio.tags.keys() if k.lower().startswith(f"{VORBIS_DANCE_KEY}.")]
|
||||
for key in keys_to_delete:
|
||||
del audio.tags[key]
|
||||
# Skriv nye — ét felt per dans
|
||||
for i, name in enumerate(dances, start=1):
|
||||
audio.tags[f"{VORBIS_DANCE_KEY}.{i}"] = name
|
||||
audio.save()
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"Vorbis skrivefejl {path}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def _write_m4a_dances(path: Path, dances: list[str]) -> bool:
|
||||
try:
|
||||
audio = MP4(str(path))
|
||||
audio.tags[M4A_DANCE_FREEFORM] = [
|
||||
MP4FreeForm(name.encode("utf-8")) for name in dances
|
||||
]
|
||||
audio.save()
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"M4A skrivefejl {path}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
# ── Hurtig læsning af kun danse (uden fuld tag-scan) ─────────────────────────
|
||||
|
||||
def read_dances_from_file(path: str | Path) -> list[str]:
|
||||
"""Læser kun danse fra en fil — hurtigere end fuld read_tags()."""
|
||||
tags = read_tags(path)
|
||||
return tags.get("dances", [])
|
||||
|
||||
|
||||
# ── BPM-analyse ───────────────────────────────────────────────────────────────
|
||||
|
||||
def analyze_bpm(path: str | Path) -> float | None:
|
||||
"""
|
||||
Analysér BPM fra lydfilen ved hjælp af librosa.
|
||||
Returnerer BPM som float eller None ved fejl.
|
||||
Tager 2-5 sekunder per sang — kør i baggrundstråd.
|
||||
"""
|
||||
try:
|
||||
import librosa
|
||||
# Indlæs kun de første 60 sekunder for hastighed
|
||||
y, sr = librosa.load(str(path), duration=60.0, mono=True)
|
||||
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
|
||||
# librosa returnerer array i nyere versioner
|
||||
if hasattr(tempo, "__len__"):
|
||||
bpm = float(tempo[0]) if len(tempo) > 0 else 0.0
|
||||
else:
|
||||
bpm = float(tempo)
|
||||
return round(bpm, 1) if bpm > 0 else None
|
||||
except ImportError:
|
||||
print("librosa ikke installeret — installer med: pip install librosa")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"BPM-analyse fejl for {path}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def analyze_and_save_bpm(path: str | Path, song_id: str) -> float | None:
|
||||
"""Analysér BPM og gem i SQLite. Returnerer målt BPM."""
|
||||
bpm = analyze_bpm(path)
|
||||
if bpm and bpm > 0:
|
||||
try:
|
||||
from local.local_db import get_db
|
||||
with get_db() as conn:
|
||||
conn.execute(
|
||||
"UPDATE songs SET bpm=? WHERE id=? AND (bpm IS NULL OR bpm=0)",
|
||||
(int(round(bpm)), song_id)
|
||||
)
|
||||
except Exception as e:
|
||||
print(f"BPM gem fejl: {e}")
|
||||
return bpm
|
||||
Reference in New Issue
Block a user