392 lines
14 KiB
Python
392 lines
14 KiB
Python
"""
|
|
tag_reader.py — Læser og skriver metadata fra lydfiler.
|
|
|
|
Understøttede formater og danse-tag support:
|
|
MP3 — læs + skriv danse (ID3 TXXX-felter)
|
|
FLAC — læs + skriv danse (Vorbis Comments)
|
|
OGG — læs + skriv danse (Vorbis Comments)
|
|
OPUS — læs + skriv danse (Vorbis Comments)
|
|
M4A — læs + skriv danse (MP4 custom felt ----:LINEDANCE:DANCE)
|
|
WAV — læs metadata, ingen danse-tag support
|
|
WMA — læs metadata, ingen danse-tag support
|
|
AIFF — læs metadata, ingen danse-tag support
|
|
|
|
Danse gemmes ALTID i SQLite uanset format.
|
|
Fil-skrivning er kun muligt for de formater der understøtter custom tags.
|
|
"""
|
|
|
|
import os
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
try:
|
|
from mutagen import File as MutagenFile
|
|
from mutagen.id3 import ID3, TXXX
|
|
from mutagen.flac import FLAC
|
|
from mutagen.mp4 import MP4, MP4FreeForm
|
|
MUTAGEN_AVAILABLE = True
|
|
except ImportError:
|
|
MUTAGEN_AVAILABLE = False
|
|
print("Advarsel: mutagen ikke installeret — tag-læsning deaktiveret")
|
|
|
|
|
|
# Filtyper vi høster metadata fra
|
|
SUPPORTED_EXTENSIONS = {
|
|
".mp3", ".flac", ".wav", ".m4a", ".aac",
|
|
".ogg", ".opus", ".wma", ".aiff", ".aif",
|
|
}
|
|
|
|
# Formater der understøtter skrivning af danse-tags til fil
|
|
WRITABLE_DANCE_FORMATS = {".mp3", ".flac", ".ogg", ".opus", ".m4a"}
|
|
|
|
# Tag-nøgler brugt på tværs af formater
|
|
TXXX_DANCE_PREFIX = "LINEDANCE_DANCE_" # MP3: TXXX:LINEDANCE_DANCE_1
|
|
VORBIS_DANCE_KEY = "linedance_dance" # FLAC/OGG: linedance_dance.1
|
|
M4A_DANCE_FREEFORM = "----:LINEDANCE:DANCE" # M4A: ----:LINEDANCE:DANCE (liste)
|
|
|
|
|
|
def is_supported(path: str | Path) -> bool:
|
|
return Path(path).suffix.lower() in SUPPORTED_EXTENSIONS
|
|
|
|
|
|
def can_write_dances(path: str | Path) -> bool:
|
|
"""Returnerer True hvis formatet understøtter skrivning af danse-tags til fil."""
|
|
return Path(path).suffix.lower() in WRITABLE_DANCE_FORMATS
|
|
|
|
|
|
def get_file_modified_at(path: str | Path) -> str:
|
|
ts = os.path.getmtime(str(path))
|
|
return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat()
|
|
|
|
|
|
# ── Læsning ───────────────────────────────────────────────────────────────────
|
|
|
|
def read_tags(path: str | Path) -> dict:
|
|
"""
|
|
Læser metadata og danse fra en lydfil.
|
|
Returnerer dict med: title, artist, album, bpm, duration_sec,
|
|
file_format, file_modified_at, dances, can_write_dances,
|
|
extra_tags (dict med alle øvrige tags som {navn: værdi}).
|
|
"""
|
|
path = Path(path)
|
|
result = {
|
|
"local_path": str(path),
|
|
"title": path.stem,
|
|
"artist": "",
|
|
"album": "",
|
|
"bpm": 0,
|
|
"duration_sec": 0,
|
|
"file_format": path.suffix.lower().lstrip("."),
|
|
"file_modified_at": get_file_modified_at(path),
|
|
"dances": [],
|
|
"can_write_dances": can_write_dances(path),
|
|
"extra_tags": {},
|
|
}
|
|
|
|
if not MUTAGEN_AVAILABLE:
|
|
return result
|
|
|
|
try:
|
|
audio = MutagenFile(str(path), easy=False)
|
|
if audio is None:
|
|
return result
|
|
|
|
if hasattr(audio, "info") and audio.info:
|
|
result["duration_sec"] = int(getattr(audio.info, "length", 0))
|
|
|
|
ext = path.suffix.lower()
|
|
|
|
if ext == ".mp3":
|
|
_read_mp3(audio, result)
|
|
elif ext == ".flac":
|
|
_read_vorbis(audio, result)
|
|
elif ext in (".ogg", ".opus"):
|
|
_read_vorbis(audio, result)
|
|
elif ext in (".m4a", ".aac", ".mp4"):
|
|
_read_m4a(audio, result)
|
|
else:
|
|
_read_generic(audio, result)
|
|
|
|
except Exception as e:
|
|
print(f"Fejl ved læsning af {path}: {e}")
|
|
|
|
return result
|
|
|
|
|
|
def _read_mp3(audio, result: dict):
|
|
tags = audio.tags
|
|
if not tags:
|
|
return
|
|
if "TIT2" in tags:
|
|
result["title"] = str(tags["TIT2"].text[0])
|
|
if "TPE1" in tags:
|
|
result["artist"] = str(tags["TPE1"].text[0])
|
|
if "TALB" in tags:
|
|
result["album"] = str(tags["TALB"].text[0])
|
|
if "TBPM" in tags:
|
|
try:
|
|
result["bpm"] = int(float(str(tags["TBPM"].text[0])))
|
|
except (ValueError, TypeError):
|
|
pass
|
|
dances = {}
|
|
extra = {}
|
|
# Kendte ID3-felt-navne til menneskelige navne
|
|
ID3_NAMES = {
|
|
"TIT2": "titel", "TPE1": "artist", "TALB": "album", "TBPM": "bpm",
|
|
"TYER": "år", "TDRC": "dato", "TCON": "genre", "TPE2": "albumartist",
|
|
"TPOS": "disknummer", "TRCK": "spornummer", "TCOM": "komponist",
|
|
"TLYR": "sangtekst", "TCOP": "copyright", "TPUB": "udgiver",
|
|
"TENC": "kodet_af", "TLAN": "sprog", "TMOO": "stemning",
|
|
"TPE3": "dirigent", "TPE4": "fortolket_af", "TOAL": "original_album",
|
|
"TOPE": "original_artist", "TORY": "original_år",
|
|
}
|
|
for key, frame in tags.items():
|
|
if key.startswith("TXXX:") and TXXX_DANCE_PREFIX in key:
|
|
try:
|
|
num = int(key.replace(f"TXXX:{TXXX_DANCE_PREFIX}", ""))
|
|
dances[num] = str(frame.text[0])
|
|
except (ValueError, IndexError):
|
|
pass
|
|
elif key.startswith("TXXX:"):
|
|
# Custom TXXX-felt — gem under dets beskrivelse
|
|
desc = key[5:] # fjern "TXXX:"
|
|
try:
|
|
extra[desc] = str(frame.text[0])
|
|
except Exception:
|
|
pass
|
|
elif key in ID3_NAMES and key not in ("TIT2","TPE1","TALB","TBPM"):
|
|
# Standardfelt vi ikke allerede har gemt
|
|
try:
|
|
val = str(frame.text[0]) if hasattr(frame, "text") else str(frame)
|
|
if val:
|
|
extra[ID3_NAMES[key]] = val
|
|
except Exception:
|
|
pass
|
|
elif hasattr(frame, "text") and key not in ("TIT2","TPE1","TALB","TBPM"):
|
|
# Alle andre tekstfelter
|
|
try:
|
|
val = str(frame.text[0])
|
|
if val and not key.startswith("APIC"): # spring albumcover over
|
|
extra[key] = val
|
|
except Exception:
|
|
pass
|
|
result["dances"] = [dances[k] for k in sorted(dances.keys())]
|
|
result["extra_tags"] = extra
|
|
|
|
|
|
def _read_vorbis(audio, result: dict):
|
|
"""FLAC og OGG/Opus bruger begge Vorbis Comments."""
|
|
tags = audio.tags
|
|
if not tags:
|
|
return
|
|
result["title"] = tags.get("title", [result["title"]])[0]
|
|
result["artist"] = tags.get("artist", [""])[0]
|
|
result["album"] = tags.get("album", [""])[0]
|
|
try:
|
|
result["bpm"] = int(tags.get("bpm", [0])[0])
|
|
except (ValueError, TypeError):
|
|
pass
|
|
# Danse
|
|
dances = {}
|
|
for key, values in tags.items():
|
|
if key.lower().startswith(f"{VORBIS_DANCE_KEY}."):
|
|
try:
|
|
num = int(key.split(".")[-1])
|
|
dances[num] = values[0]
|
|
except (ValueError, IndexError):
|
|
pass
|
|
if not dances and VORBIS_DANCE_KEY in tags:
|
|
result["dances"] = [d.strip() for d in tags[VORBIS_DANCE_KEY][0].split(",") if d.strip()]
|
|
else:
|
|
result["dances"] = [dances[k] for k in sorted(dances.keys())]
|
|
# Alle øvrige tags som extra_tags
|
|
skip = {"title", "artist", "album", "bpm", VORBIS_DANCE_KEY}
|
|
extra = {}
|
|
for key, values in tags.items():
|
|
k = key.lower()
|
|
if k not in skip and not k.startswith(VORBIS_DANCE_KEY):
|
|
try:
|
|
extra[k] = str(values[0])
|
|
except Exception:
|
|
pass
|
|
result["extra_tags"] = extra
|
|
|
|
|
|
def _read_m4a(audio, result: dict):
|
|
tags = audio.tags
|
|
if not tags:
|
|
return
|
|
if "\xa9nam" in tags:
|
|
result["title"] = str(tags["\xa9nam"][0])
|
|
if "\xa9ART" in tags:
|
|
result["artist"] = str(tags["\xa9ART"][0])
|
|
if "\xa9alb" in tags:
|
|
result["album"] = str(tags["\xa9alb"][0])
|
|
if "tmpo" in tags:
|
|
try:
|
|
result["bpm"] = int(tags["tmpo"][0])
|
|
except (ValueError, TypeError):
|
|
pass
|
|
if M4A_DANCE_FREEFORM in tags:
|
|
result["dances"] = [
|
|
v.decode("utf-8") if isinstance(v, (bytes, MP4FreeForm)) else str(v)
|
|
for v in tags[M4A_DANCE_FREEFORM]
|
|
]
|
|
# Menneskelige navne til M4A-nøgler
|
|
M4A_NAMES = {
|
|
"\xa9nam": "titel", "\xa9ART": "artist", "\xa9alb": "album",
|
|
"\xa9day": "år", "\xa9gen": "genre", "\xa9wrt": "komponist",
|
|
"\xa9cmt": "kommentar", "aART": "albumartist", "trkn": "spornummer",
|
|
"disk": "disknummer", "cprt": "copyright", "\xa9lyr": "sangtekst",
|
|
"tmpo": "bpm",
|
|
}
|
|
skip_keys = {"\xa9nam", "\xa9ART", "\xa9alb", "tmpo", M4A_DANCE_FREEFORM, "covr"}
|
|
extra = {}
|
|
for key, values in tags.items():
|
|
if key in skip_keys:
|
|
continue
|
|
label = M4A_NAMES.get(key, key)
|
|
try:
|
|
val = values[0]
|
|
if isinstance(val, (bytes, MP4FreeForm)):
|
|
val = val.decode("utf-8", errors="replace")
|
|
extra[label] = str(val)
|
|
except Exception:
|
|
pass
|
|
result["extra_tags"] = extra
|
|
|
|
|
|
def _read_generic(audio, result: dict):
|
|
try:
|
|
easy = MutagenFile(result["local_path"], easy=True)
|
|
if easy and easy.tags:
|
|
result["title"] = easy.tags.get("title", [result["title"]])[0]
|
|
result["artist"] = easy.tags.get("artist", [""])[0]
|
|
result["album"] = easy.tags.get("album", [""])[0]
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ── Skrivning ─────────────────────────────────────────────────────────────────
|
|
|
|
def write_dances(path: str | Path, dances: list[str]) -> bool:
|
|
"""
|
|
Skriver danse til filen hvis formatet understøtter det.
|
|
Returnerer True ved succes, False hvis formatet ikke understøtter det.
|
|
Kaster Exception ved fejl under skrivning.
|
|
"""
|
|
if not MUTAGEN_AVAILABLE:
|
|
return False
|
|
|
|
path = Path(path)
|
|
ext = path.suffix.lower()
|
|
|
|
if ext not in WRITABLE_DANCE_FORMATS:
|
|
return False
|
|
|
|
if ext == ".mp3":
|
|
return _write_mp3_dances(path, dances)
|
|
elif ext in (".flac", ".ogg", ".opus"):
|
|
return _write_vorbis_dances(path, dances)
|
|
elif ext in (".m4a", ".aac"):
|
|
return _write_m4a_dances(path, dances)
|
|
|
|
return False
|
|
|
|
|
|
def _write_mp3_dances(path: Path, dances: list[str]) -> bool:
|
|
try:
|
|
tags = ID3(str(path))
|
|
for key in [k for k in tags.keys() if TXXX_DANCE_PREFIX in k]:
|
|
del tags[key]
|
|
for i, name in enumerate(dances, start=1):
|
|
tags.add(TXXX(encoding=3, desc=f"{TXXX_DANCE_PREFIX}{i}", text=name))
|
|
tags.save(str(path))
|
|
return True
|
|
except Exception as e:
|
|
print(f"MP3 skrivefejl {path}: {e}")
|
|
return False
|
|
|
|
|
|
def _write_vorbis_dances(path: Path, dances: list[str]) -> bool:
|
|
try:
|
|
audio = MutagenFile(str(path), easy=False)
|
|
if audio is None or audio.tags is None:
|
|
return False
|
|
# Slet eksisterende danse-felter
|
|
keys_to_delete = [k for k in audio.tags.keys() if k.lower().startswith(f"{VORBIS_DANCE_KEY}.")]
|
|
for key in keys_to_delete:
|
|
del audio.tags[key]
|
|
# Skriv nye — ét felt per dans
|
|
for i, name in enumerate(dances, start=1):
|
|
audio.tags[f"{VORBIS_DANCE_KEY}.{i}"] = name
|
|
audio.save()
|
|
return True
|
|
except Exception as e:
|
|
print(f"Vorbis skrivefejl {path}: {e}")
|
|
return False
|
|
|
|
|
|
def _write_m4a_dances(path: Path, dances: list[str]) -> bool:
|
|
try:
|
|
audio = MP4(str(path))
|
|
audio.tags[M4A_DANCE_FREEFORM] = [
|
|
MP4FreeForm(name.encode("utf-8")) for name in dances
|
|
]
|
|
audio.save()
|
|
return True
|
|
except Exception as e:
|
|
print(f"M4A skrivefejl {path}: {e}")
|
|
return False
|
|
|
|
|
|
# ── Hurtig læsning af kun danse (uden fuld tag-scan) ─────────────────────────
|
|
|
|
def read_dances_from_file(path: str | Path) -> list[str]:
|
|
"""Læser kun danse fra en fil — hurtigere end fuld read_tags()."""
|
|
tags = read_tags(path)
|
|
return tags.get("dances", [])
|
|
|
|
|
|
# ── BPM-analyse ───────────────────────────────────────────────────────────────
|
|
|
|
def analyze_bpm(path: str | Path) -> float | None:
|
|
"""
|
|
Analysér BPM fra lydfilen ved hjælp af librosa.
|
|
Returnerer BPM som float eller None ved fejl.
|
|
Tager 2-5 sekunder per sang — kør i baggrundstråd.
|
|
"""
|
|
try:
|
|
import librosa
|
|
# Indlæs kun de første 60 sekunder for hastighed
|
|
y, sr = librosa.load(str(path), duration=60.0, mono=True)
|
|
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
|
|
# librosa returnerer array i nyere versioner
|
|
if hasattr(tempo, "__len__"):
|
|
bpm = float(tempo[0]) if len(tempo) > 0 else 0.0
|
|
else:
|
|
bpm = float(tempo)
|
|
return round(bpm, 1) if bpm > 0 else None
|
|
except ImportError:
|
|
print("librosa ikke installeret — installer med: pip install librosa")
|
|
return None
|
|
except Exception as e:
|
|
print(f"BPM-analyse fejl for {path}: {e}")
|
|
return None
|
|
|
|
|
|
def analyze_and_save_bpm(path: str | Path, song_id: str) -> float | None:
|
|
"""Analysér BPM og gem i SQLite. Returnerer målt BPM."""
|
|
bpm = analyze_bpm(path)
|
|
if bpm and bpm > 0:
|
|
try:
|
|
from local.local_db import get_db
|
|
with get_db() as conn:
|
|
conn.execute(
|
|
"UPDATE songs SET bpm=? WHERE id=? AND (bpm IS NULL OR bpm=0)",
|
|
(int(round(bpm)), song_id)
|
|
)
|
|
except Exception as e:
|
|
print(f"BPM gem fejl: {e}")
|
|
return bpm
|