299 lines
9.3 KiB
Python
299 lines
9.3 KiB
Python
"""
|
|
acoustid_worker.py — Baggrunds-fingerprinting via AcoustID/Chromaprint.
|
|
|
|
Finder sange uden MBID, kører fpcalc på dem, spørger AcoustID API
|
|
og gemmer MBID + acoustid lokalt. Kører som lavprioritets baggrundstråd.
|
|
|
|
Kræver fpcalc (Chromaprint) installeret:
|
|
Linux: sudo apt install libchromaprint-tools
|
|
Windows: download fra https://acoustid.org/chromaprint
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# AcoustID API nøgle — kan overskrives i Indstillinger → Afspilning
|
|
# Registrér din egen på https://acoustid.org/new-application
|
|
ACOUSTID_API_KEY = "6fd9DGNDqG"
|
|
ACOUSTID_API_URL = "https://api.acoustid.org/v2/lookup"
|
|
|
|
# Pause mellem API-kald — rolig baggrundskørsel
|
|
API_DELAY = 5.0
|
|
|
|
# Maks sange per session — fordeles over mange opstart
|
|
MAX_PER_SESSION = 20
|
|
|
|
|
|
def find_fpcalc() -> str | None:
|
|
"""Find fpcalc på systemet."""
|
|
import shutil, sys
|
|
|
|
# Tjek PATH
|
|
path = shutil.which("fpcalc")
|
|
if path:
|
|
return path
|
|
|
|
# Windows: tjek ved siden af exe
|
|
if sys.platform == "win32":
|
|
candidates = [
|
|
Path(sys.executable).parent / "fpcalc.exe",
|
|
Path("C:/Program Files/Chromaprint/fpcalc.exe"),
|
|
Path("C:/Program Files (x86)/Chromaprint/fpcalc.exe"),
|
|
]
|
|
for c in candidates:
|
|
if c.exists():
|
|
return str(c)
|
|
|
|
return None
|
|
|
|
|
|
def fingerprint_file(path: str, fpcalc: str) -> tuple[str, int] | None:
|
|
"""
|
|
Kør fpcalc på en fil og returnér (fingerprint, duration).
|
|
Returnerer None ved fejl.
|
|
"""
|
|
try:
|
|
result = subprocess.run(
|
|
[fpcalc, "-json", path],
|
|
capture_output=True, text=True, timeout=30
|
|
)
|
|
if result.returncode != 0:
|
|
return None
|
|
data = json.loads(result.stdout)
|
|
fp = data.get("fingerprint", "")
|
|
dur = int(data.get("duration", 0))
|
|
if fp and dur:
|
|
return fp, dur
|
|
except Exception as e:
|
|
logger.warning(f"fpcalc fejl for {path}: {e}")
|
|
return None
|
|
|
|
|
|
def lookup_acoustid(fingerprint: str, duration: int, api_key: str = "") -> dict | None:
|
|
"""
|
|
Spørg AcoustID API om MBID for et fingerprint.
|
|
Returnerer dict med 'mbid' og 'acoustid' eller None.
|
|
"""
|
|
try:
|
|
import urllib.request, urllib.parse, urllib.error
|
|
|
|
params = urllib.parse.urlencode({
|
|
"client": api_key or ACOUSTID_API_KEY,
|
|
"fingerprint": fingerprint,
|
|
"duration": duration,
|
|
"meta": "recordings",
|
|
})
|
|
url = f"{ACOUSTID_API_URL}?{params}"
|
|
req = urllib.request.Request(url)
|
|
req.add_header("User-Agent", "LineDancePlayer/1.0")
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
data = json.loads(resp.read())
|
|
except urllib.error.HTTPError as e:
|
|
body = e.read().decode("utf-8", errors="replace")
|
|
logger.warning(f"AcoustID API fejl {e.code}: {body[:200]}")
|
|
return None
|
|
|
|
if data.get("status") != "ok":
|
|
logger.warning(f"AcoustID status: {data.get('status')} — {data.get('error', {}).get('message','')}")
|
|
return None
|
|
|
|
results = data.get("results", [])
|
|
if not results:
|
|
return None
|
|
|
|
# Tag det bedste resultat (højeste score)
|
|
best = max(results, key=lambda r: r.get("score", 0))
|
|
if best.get("score", 0) < 0.85:
|
|
return None # For usikkert
|
|
|
|
acoustid = best.get("id", "")
|
|
recordings = best.get("recordings", [])
|
|
mbid = recordings[0].get("id", "") if recordings else ""
|
|
|
|
return {"acoustid": acoustid, "mbid": mbid} if acoustid else None
|
|
|
|
except Exception as e:
|
|
logger.warning(f"AcoustID API fejl: {e}")
|
|
return None
|
|
|
|
|
|
def run_acoustid_scan(db_path: str, api_key: str = "", on_progress=None, stop_event: threading.Event = None):
|
|
"""
|
|
Gennemgå alle sange uden MBID og forsøg AcoustID fingerprinting.
|
|
Kører i batches på MAX_PER_SESSION med BATCH_DELAY pause imellem.
|
|
"""
|
|
import sqlite3
|
|
|
|
fpcalc = find_fpcalc()
|
|
if not fpcalc:
|
|
logger.info("AcoustID: fpcalc ikke fundet — spring over")
|
|
if on_progress:
|
|
on_progress(0, 0, "fpcalc ikke installeret")
|
|
return
|
|
|
|
key = api_key or ACOUSTID_API_KEY
|
|
if not key:
|
|
logger.warning("AcoustID: ingen API-nøgle — spring over")
|
|
return
|
|
|
|
logger.info(f"AcoustID: fpcalc fundet: {fpcalc}")
|
|
|
|
batch_num = 0
|
|
total_found = 0
|
|
|
|
while True:
|
|
if stop_event and stop_event.is_set():
|
|
logger.info("AcoustID: stoppet af bruger")
|
|
break
|
|
|
|
conn = sqlite3.connect(db_path, timeout=10)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
rows = conn.execute("""
|
|
SELECT s.id, s.title, s.artist, f.local_path
|
|
FROM songs s
|
|
JOIN files f ON f.song_id = s.id AND f.file_missing = 0
|
|
WHERE (s.mbid IS NULL OR s.mbid = '')
|
|
AND f.local_path IS NOT NULL AND f.local_path != ''
|
|
ORDER BY RANDOM()
|
|
LIMIT ?
|
|
""", (MAX_PER_SESSION,)).fetchall()
|
|
|
|
conn.close()
|
|
|
|
if not rows:
|
|
logger.info(f"AcoustID: alle sange har nu MBID — stopper")
|
|
if on_progress:
|
|
on_progress(1, 1, f"Færdig — {total_found} sange fik MBID i alt")
|
|
break
|
|
|
|
batch_num += 1
|
|
total = len(rows)
|
|
done = 0
|
|
found = 0
|
|
logger.info(f"AcoustID: batch {batch_num} — {total} sange")
|
|
|
|
conn = sqlite3.connect(db_path, timeout=10)
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
for row in rows:
|
|
if stop_event and stop_event.is_set():
|
|
conn.close()
|
|
return
|
|
|
|
path = row["local_path"]
|
|
if not Path(path).exists():
|
|
done += 1
|
|
continue
|
|
|
|
if on_progress:
|
|
on_progress(done, total, row["title"] or path)
|
|
|
|
fp_result = fingerprint_file(path, fpcalc)
|
|
if not fp_result:
|
|
done += 1
|
|
time.sleep(0.1)
|
|
continue
|
|
|
|
fingerprint, duration = fp_result
|
|
|
|
result = lookup_acoustid(fingerprint, duration, key)
|
|
time.sleep(API_DELAY)
|
|
|
|
if result:
|
|
mbid = result.get("mbid", "")
|
|
acoustid = result.get("acoustid", "")
|
|
# Opdater acoustid altid, men kun mbid hvis det ikke allerede bruges
|
|
conn.execute(
|
|
"UPDATE songs SET acoustid=? WHERE id=?",
|
|
(acoustid or None, row["id"])
|
|
)
|
|
if mbid:
|
|
try:
|
|
conn.execute(
|
|
"UPDATE songs SET mbid=? WHERE id=? AND (mbid IS NULL OR mbid='')",
|
|
(mbid, row["id"])
|
|
)
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
logger.debug(f"MBID {mbid[:8]} allerede i brug — springer over")
|
|
else:
|
|
conn.commit()
|
|
found += 1
|
|
total_found += 1
|
|
logger.info(
|
|
f"AcoustID: {row['title']} → MBID={mbid[:8] if mbid else '—'}"
|
|
)
|
|
if mbid and path:
|
|
try:
|
|
from local.tag_reader import write_mbid_to_file
|
|
write_mbid_to_file(path, mbid)
|
|
except Exception as e:
|
|
logger.warning(f"AcoustID: kunne ikke skrive MBID til fil: {e}")
|
|
|
|
done += 1
|
|
|
|
conn.close()
|
|
logger.info(f"AcoustID: batch {batch_num} færdig — {found}/{total} fik MBID")
|
|
|
|
if on_progress:
|
|
on_progress(done, total, f"Batch {batch_num}: {found}/{total} fik MBID — venter 60 sek")
|
|
|
|
# Vent et minut inden næste batch
|
|
for _ in range(60):
|
|
if stop_event and stop_event.is_set():
|
|
return
|
|
time.sleep(1)
|
|
|
|
|
|
class AcoustIDWorker:
|
|
"""
|
|
Baggrunds-worker der kører AcoustID fingerprinting
|
|
som lavprioritets tråd.
|
|
"""
|
|
|
|
def __init__(self, db_path: str):
|
|
self._db_path = db_path
|
|
self._thread = None
|
|
self._stop_event = threading.Event()
|
|
self._running = False
|
|
|
|
def start(self, api_key: str = "", on_progress=None):
|
|
"""Start baggrunds-fingerprinting. Gør ingenting hvis allerede kører."""
|
|
if self._running:
|
|
return
|
|
self._stop_event.clear()
|
|
self._running = True
|
|
|
|
def _run():
|
|
try:
|
|
run_acoustid_scan(
|
|
self._db_path,
|
|
api_key=api_key,
|
|
on_progress=on_progress,
|
|
stop_event=self._stop_event,
|
|
)
|
|
finally:
|
|
self._running = False
|
|
|
|
self._thread = threading.Thread(target=_run, daemon=True, name="acoustid")
|
|
self._thread.start()
|
|
logger.info("AcoustID: baggrunds-worker startet")
|
|
|
|
def stop(self):
|
|
"""Stop baggrunds-fingerprinting."""
|
|
self._stop_event.set()
|
|
|
|
def is_running(self) -> bool:
|
|
return self._running |