""" scanner.py — Scanning af musikbiblioteker i baggrunden. Kører som en separat subprocess der scanner ét bibliotek ad gangen og rapporterer fremgang via stdout JSON-linjer. Kan også importeres direkte og bruges via ScanWorker QThread. """ import os import sys import json import sqlite3 import uuid import logging from pathlib import Path logger = logging.getLogger(__name__) SUPPORTED = {'.mp3', '.flac', '.m4a', '.ogg', '.wav', '.aiff', '.wma'} def is_supported(path: Path) -> bool: return path.suffix.lower() in SUPPORTED def get_file_mtime(path: Path) -> str: try: return str(os.path.getmtime(str(path))) except Exception: return "" def scan_library(library_id: int, library_path: str, db_path: str, overwrite_bpm: bool = False, progress_callback=None): """ Scan ét bibliotek og upsert sange til SQLite. progress_callback(done, total, current_file) kaldes løbende. """ from local.tag_reader import read_tags conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row base = Path(library_path) if not base.exists(): conn.close() return 0 # Byg indeks over kendte filer known = {} for row in conn.execute( "SELECT local_path, file_modified_at, file_missing FROM songs WHERE library_id=?", (library_id,) ).fetchall(): # Sange markeret som manglende medtages ikke i known — de skal altid genscanes if not row["file_missing"]: known[row["local_path"]] = row["file_modified_at"] # Find alle musikfiler all_files = [] for dirpath, _, filenames in os.walk(str(base), followlinks=False): for fn in filenames: fp = Path(dirpath) / fn if is_supported(fp): all_files.append(fp) total = len(all_files) done = 0 import time for fp in all_files: path_str = str(fp) mtime = get_file_mtime(fp) if progress_callback: progress_callback(done, total, fp.name) # Spring over hvis ikke ændret if path_str in known and known[path_str] == mtime: done += 1 # Yield hvert 100. fil så andre tråde kan køre if done % 100 == 0: time.sleep(0.005) continue try: tags = read_tags(fp) extra = json.dumps(tags.get("extra_tags", {}), ensure_ascii=False) # Match 0: MBID-match — sikrest mulige match existing = None mbid_from_file = tags.get("mbid", "") if mbid_from_file: existing = conn.execute( "SELECT id, bpm FROM songs WHERE mbid=? LIMIT 1", (mbid_from_file,) ).fetchone() if existing: conn.execute( "UPDATE songs SET local_path=? WHERE id=?", (path_str, existing["id"]) ) # Match 1: præcis sti-match if not existing: existing = conn.execute( "SELECT id, bpm FROM songs WHERE local_path=?", (path_str,) ).fetchone() # Match 2: titel+artist match — fil er flyttet eller var missing if not existing: title = tags.get("title", "") artist = tags.get("artist", "") if title: # Prioritér file_missing=1 sange, men tag også sange med ugyldig sti existing = conn.execute(""" SELECT id, bpm FROM songs WHERE title=? AND artist=? AND file_missing=1 LIMIT 1 """, (title, artist)).fetchone() if not existing: # Tjek om der er en sang med samme titel+artist men ugyldig sti existing = conn.execute(""" SELECT id, bpm, local_path FROM songs WHERE title=? AND artist=? AND file_missing=0 LIMIT 1 """, (title, artist)).fetchone() if existing: from pathlib import Path as _Path old_path = existing["local_path"] or "" if old_path and not _Path(old_path).exists(): pass # Sti er ugyldig — brug dette match else: existing = None # Sti er valid — det er en anden fil if existing: # Opdater stien så den peger på den nye placering conn.execute( "UPDATE songs SET local_path=? WHERE id=?", (path_str, existing["id"]) ) if existing: bpm = tags.get("bpm", 0) if not overwrite_bpm and existing["bpm"] and existing["bpm"] > 0: bpm = existing["bpm"] # behold eksisterende BPM mbid = tags.get("mbid", "") conn.execute(""" UPDATE songs SET library_id=?, title=?, artist=?, album=?, bpm=?, duration_sec=?, file_format=?, file_modified_at=?, file_missing=0, extra_tags=?, mbid=CASE WHEN ? != '' THEN ? ELSE mbid END WHERE id=? """, (library_id, tags.get("title",""), tags.get("artist",""), tags.get("album",""), bpm, tags.get("duration_sec",0), tags.get("file_format",""), mtime, extra, mbid, mbid, existing["id"])) song_id = existing["id"] else: song_id = str(uuid.uuid4()) conn.execute(""" INSERT OR IGNORE INTO songs (id, library_id, local_path, title, artist, album, bpm, duration_sec, file_format, file_modified_at, extra_tags, mbid) VALUES (?,?,?,?,?,?,?,?,?,?,?,?) """, (song_id, library_id, path_str, tags.get("title",""), tags.get("artist",""), tags.get("album",""), tags.get("bpm",0), tags.get("duration_sec",0), tags.get("file_format",""), mtime, extra, tags.get("mbid",""))) # Importer dans-tags fra filen hvis de ikke allerede er i DB file_dances = tags.get("dances", []) if file_dances: existing_dances = conn.execute( "SELECT COUNT(*) FROM song_dances WHERE song_id=?", (song_id,) ).fetchone()[0] if existing_dances == 0: for order, dance_name in enumerate(file_dances, start=1): dance_row = conn.execute( "SELECT id FROM dances WHERE name=? COLLATE NOCASE LIMIT 1", (dance_name,) ).fetchone() if not dance_row: cur = conn.execute( "INSERT INTO dances (name) VALUES (?)", (dance_name,) ) dance_id = cur.lastrowid else: dance_id = dance_row["id"] conn.execute( "INSERT OR IGNORE INTO song_dances (song_id, dance_id, dance_order) VALUES (?,?,?)", (song_id, dance_id, order) ) conn.commit() except Exception as e: logger.warning(f"Scan fejl {fp.name}: {e}") done += 1 # Lille pause efter hver scannet fil så GUI ikke hænger time.sleep(0.02) # Marker manglende filer for path_str in known: if not Path(path_str).exists(): conn.execute( "UPDATE songs SET file_missing=1 WHERE local_path=?", (path_str,) ) conn.commit() conn.execute( "UPDATE libraries SET last_full_scan=datetime('now') WHERE id=?", (library_id,) ) conn.commit() conn.close() return done # ── Subprocess entry point ───────────────────────────────────────────────────── if __name__ == "__main__": """ Kørsel som subprocess: python scanner.py Rapporterer JSON-linjer til stdout: {"done":N,"total":M,"file":"..."} """ if len(sys.argv) < 4: sys.exit(1) lib_id = int(sys.argv[1]) lib_path = sys.argv[2] db_path = sys.argv[3] # Tilføj app-mappen til path så local.tag_reader kan importeres app_dir = str(Path(__file__).parent.parent) if app_dir not in sys.path: sys.path.insert(0, app_dir) def report(done, total, filename): print(json.dumps({"done": done, "total": total, "file": filename}), flush=True) count = scan_library(lib_id, lib_path, db_path, progress_callback=report) print(json.dumps({"done": count, "total": count, "finished": True}), flush=True)