from pathlib import Path import re from urllib.parse import quote_plus from typing import Optional from PyQt6.QtCore import QObject, QThread, pyqtSignal from mutagen.id3 import APIC, ID3, TALB, TIT2, TPE1, delete as id3_delete from yt_dlp import YoutubeDL import app.utils.paths as paths from app.core.settings_manager import SettingsManager class AudioDownloadWorker(QThread): progress_text = pyqtSignal(str) progress_value = pyqtSignal(int) completed = pyqtSignal(str) failed = pyqtSignal(str) def __init__(self, url: str, output_dir: str, audio_format: str, audio_quality: str, ffmpeg_location: Optional[str] = None, track_metadata: Optional[dict] = None) -> None: super().__init__() self.url = url self.output_dir = output_dir self.audio_format = audio_format self.audio_quality = audio_quality self.ffmpeg_location = ffmpeg_location self.track_metadata = track_metadata if isinstance(track_metadata, dict) else {} def _progress_hook(self, data: dict) -> None: status = data.get("status") if status == "downloading": percent_text = str(data.get("_percent_str", "")).strip() percent_text = re.sub(r"\x1b\[[0-9;]*m", "", percent_text) percent_match = re.search(r"(\d+(?:\.\d+)?)%", percent_text) if percent_match: value = int(float(percent_match.group(1))) self.progress_value.emit(max(0, min(100, value))) self.progress_text.emit(percent_text or "Downloading...") elif status == "finished": self.progress_value.emit(100) def run(self) -> None: try: ydl_opts = { "format": "bestaudio/best", "noplaylist": True, "outtmpl": str(Path(self.output_dir) / "%(title)s.%(ext)s"), "progress_hooks": [self._progress_hook], "quiet": True, "no_warnings": True, } if self.ffmpeg_location: ydl_opts["ffmpeg_location"] = self.ffmpeg_location if self.audio_format == "mp3": ydl_opts.update( { "postprocessors": [ { "key": "FFmpegExtractAudio", "preferredcodec": "mp3", "preferredquality": self.audio_quality, } ] } ) with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(self.url, download=True) title = info.get("title", "audio") self.progress_text.emit(title) if self.audio_format == "mp3": downloaded_file = self._resolve_downloaded_file(info) if downloaded_file: self._apply_mp3_tags(downloaded_file, title) self.completed.emit(title) except Exception as error: self.failed.emit(str(error)) def _resolve_downloaded_file(self, info: dict) -> Optional[Path]: candidates: list[Path] = [] requested_downloads = info.get("requested_downloads") if isinstance(info, dict) else None if isinstance(requested_downloads, list): for download_data in requested_downloads: if not isinstance(download_data, dict): continue for key in ("filepath", "_filename"): candidate_path = download_data.get(key) if isinstance(candidate_path, str) and candidate_path: candidates.append(Path(candidate_path)) for key in ("filepath", "_filename"): candidate_path = info.get(key) if isinstance(info, dict) else None if isinstance(candidate_path, str) and candidate_path: candidates.append(Path(candidate_path)) for candidate in candidates: if candidate.exists() and candidate.is_file() and candidate.suffix.lower() == ".mp3": return candidate mp3_candidate = candidate.with_suffix(".mp3") if mp3_candidate.exists() and mp3_candidate.is_file(): return mp3_candidate output_dir = Path(self.output_dir) if output_dir.exists(): mp3_files = sorted(output_dir.glob("*.mp3"), key=lambda file_path: file_path.stat().st_mtime, reverse=True) if mp3_files: return mp3_files[0] return None def _apply_mp3_tags(self, file_path: Path, fallback_title: str) -> None: from app.core.metadata_manager import MetadataManager metadata_manager = MetadataManager() metadata_manager.apply_mp3_tags(file_path, self.track_metadata, fallback_title) renamed_file = metadata_manager.rename_output_file(file_path, self.track_metadata.get("mp3_title") or fallback_title, self.track_metadata.get("artist", "")) if renamed_file is not None: self.progress_text.emit(renamed_file.stem) class YoutubeSearchWorker(QThread): finished_search = pyqtSignal(list) failed_search = pyqtSignal(str) def __init__(self, query: str) -> None: super().__init__() self.query = query def run(self) -> None: try: results: list[dict] = [] seen_urls: set[str] = set() ydl_opts = {"quiet": True, "no_warnings": True, "extract_flat": True} with YoutubeDL(ydl_opts) as ydl: video_result = ydl.extract_info(f"ytsearch10:{self.query}", download=False) playlist_url = f"https://www.youtube.com/results?search_query={quote_plus(self.query)}&sp=EgIQAw%253D%253D" playlist_result = ydl.extract_info(playlist_url, download=False) video_entries = video_result.get("entries", []) if isinstance(video_result, dict) else [] for entry in video_entries: item = self._normalize_video_entry(entry) if item and item["url"] not in seen_urls: seen_urls.add(item["url"]) results.append(item) playlist_entries = playlist_result.get("entries", []) if isinstance(playlist_result, dict) else [] for entry in playlist_entries: item = self._normalize_playlist_entry(entry) if item and item["url"] not in seen_urls: seen_urls.add(item["url"]) results.append(item) self.finished_search.emit(results) except Exception: self.failed_search.emit("search_failed") def _normalize_video_entry(self, entry: dict) -> Optional[dict]: if not isinstance(entry, dict): return None video_id = entry.get("id") if not video_id: return None title = entry.get("title") or "Untitled" channel = entry.get("uploader") or entry.get("channel") or "" raw_duration = int(entry.get("duration")) duration = self._format_duration(raw_duration) thumbnail_url = f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg" return { "kind": "video", "title": title, "url": f"https://www.youtube.com/watch?v={video_id}", "channel": channel, "duration": duration, "item_count": "", "thumbnail_url": thumbnail_url, "thumbnail_bytes": b"", } def _normalize_playlist_entry(self, entry: dict) -> Optional[dict]: if not isinstance(entry, dict): return None playlist_id = entry.get("id") if not playlist_id: return None title = entry.get("title") or "Untitled playlist" channel = entry.get("uploader") or entry.get("channel") or "" count = entry.get("playlist_count") or entry.get("n_entries") or 0 thumbnails = entry.get("thumbnails") or [] thumbnail_url = "" if thumbnails and isinstance(thumbnails, list): thumbnail_url = (thumbnails[-1] or {}).get("url", "") if not thumbnail_url: thumbnail_url = f"https://i.ytimg.com/vi/{playlist_id}/hqdefault.jpg" return { "kind": "playlist", "title": title, "url": f"https://www.youtube.com/playlist?list={playlist_id}", "channel": channel, "duration": "", "item_count": str(count) if count else "", "thumbnail_url": thumbnail_url, "thumbnail_bytes": b"", } def _format_duration(self, seconds: Optional[int]) -> str: if not isinstance(seconds, int) or seconds < 0: return "" minutes, sec = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) if hours: return f"{hours}:{minutes:02}:{sec:02}" return f"{minutes}:{sec:02}" class PreviewStreamWorker(QThread): finished_preview = pyqtSignal(str, str) failed_preview = pyqtSignal(str) def __init__(self, url: str, title: str) -> None: super().__init__() self.url = url self.title = title def run(self) -> None: try: ydl_opts = { "quiet": True, "no_warnings": True, "format": "best[height<=720][vcodec!=none][acodec!=none]/best[ext=mp4][height<=720]/best", "noplaylist": True, } with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(self.url, download=False) stream_url = info.get("url") if isinstance(info, dict) else None if stream_url: self.finished_preview.emit(stream_url, self.title) else: self.failed_preview.emit("preview_unavailable") except Exception: self.failed_preview.emit("preview_unavailable") class YoutubeManager(QObject): def apply_album_cover(self, album_name: str, cover_path: str) -> None: """Applique la cover à tous les morceaux de l'album (business logic)""" for idx, track in enumerate(self.track_list): if str(track.get("album", "")).strip() == album_name: self.update_track_metadata( idx, str(track.get("mp3_title") or track.get("title", "")), str(track.get("artist", "")), album_name, cover_path, ) started = pyqtSignal() progress_text = pyqtSignal(str) progress_value = pyqtSignal(int) completed = pyqtSignal(str) failed = pyqtSignal(str) list_changed = pyqtSignal(list) queue_finished = pyqtSignal() downloading_state_changed = pyqtSignal(bool) search_started = pyqtSignal() search_results = pyqtSignal(list) preview_started = pyqtSignal() preview_ready = pyqtSignal(str, str) preview_failed = pyqtSignal(str) def __init__(self, settings_manager: SettingsManager) -> None: super().__init__() self.settings_manager = settings_manager self.download_worker: Optional[AudioDownloadWorker] = None self.search_worker: Optional[YoutubeSearchWorker] = None self.preview_worker: Optional[PreviewStreamWorker] = None self.track_list: list[dict[str, str]] = [] self.download_queue: list[dict[str, str]] = [] self.queue_output_dir: str = "" self.last_playlist_added_count: int = 0 def get_tracks(self) -> list[dict[str, str]]: return list(self.track_list) def _build_track_payload(self, title: str, url: str, artist: str = "", album: str = "", cover_path: str = "") -> dict[str, str]: clean_title = title.strip() clean_url = url.strip() return { "title": clean_title, "url": clean_url, "mp3_title": clean_title, "artist": artist.strip(), "album": album.strip(), "cover_path": cover_path.strip(), } def add_track(self, url: str) -> bool: clean_url = url.strip() if not self.is_valid_youtube_url(clean_url): self.failed.emit("invalid_youtube_url") return False title = self._extract_title_from_url(clean_url) if not title: self.failed.emit("metadata_fetch_failed") return False self.track_list.append(self._build_track_payload(title=title, url=clean_url)) self.list_changed.emit(self.get_tracks()) return True def add_search_result(self, search_result: dict) -> bool: self.last_playlist_added_count = 0 kind = search_result.get("kind", "video") url = search_result.get("url", "") title = search_result.get("title", "") if kind == "playlist": tracks = self._extract_playlist_tracks(url) if not tracks: self.failed.emit("playlist_expand_failed") return False self.track_list.extend(tracks) self.last_playlist_added_count = len(tracks) self.list_changed.emit(self.get_tracks()) return True return self.add_track_with_title(url, title) def add_track_with_title(self, url: str, title: str) -> bool: clean_url = url.strip() clean_title = title.strip() if not self.is_valid_youtube_url(clean_url): self.failed.emit("invalid_youtube_url") return False if not clean_title: self.failed.emit("metadata_fetch_failed") return False self.track_list.append(self._build_track_payload(title=clean_title, url=clean_url)) self.list_changed.emit(self.get_tracks()) return True def update_track_metadata(self, index: int, mp3_title: str, artist: str, album: str, cover_path: str) -> bool: if index < 0 or index >= len(self.track_list): self.failed.emit("no_track_selected") return False track = self.track_list[index] track["mp3_title"] = mp3_title.strip() track["artist"] = artist.strip() track["album"] = album.strip() track["cover_path"] = cover_path.strip() self.list_changed.emit(self.get_tracks()) return True def search_tracks(self, query: str) -> bool: clean_query = query.strip() if len(clean_query) < 2: self.failed.emit("invalid_search_query") return False if self.search_worker and self.search_worker.isRunning(): self.failed.emit("search_in_progress") return False self.search_worker = YoutubeSearchWorker(clean_query) self.search_worker.finished_search.connect(self.search_results.emit) self.search_worker.failed_search.connect(self.failed.emit) self.search_started.emit() self.search_worker.start() return True def start_preview(self, search_result: dict) -> bool: url = search_result.get("url", "") title = search_result.get("title", "") kind = search_result.get("kind", "video") if kind != "video": self.preview_failed.emit("preview_playlist_unavailable") return False if not self.is_valid_youtube_url(url): self.preview_failed.emit("invalid_youtube_url") return False if self.preview_worker and self.preview_worker.isRunning(): self.preview_failed.emit("preview_in_progress") return False self.preview_worker = PreviewStreamWorker(url, title) self.preview_worker.finished_preview.connect(self.preview_ready.emit) self.preview_worker.failed_preview.connect(self.preview_failed.emit) self.preview_started.emit() self.preview_worker.start() return True def remove_track(self, index: int) -> bool: if index < 0 or index >= len(self.track_list): self.failed.emit("no_track_selected") return False self.track_list.pop(index) self.list_changed.emit(self.get_tracks()) return True def clear_tracks(self) -> None: self.track_list.clear() self.list_changed.emit(self.get_tracks()) def download_track_at(self, index: int, output_dir: str) -> bool: if index < 0 or index >= len(self.track_list): self.failed.emit("no_track_selected") return False return self.download_audio(self.track_list[index]["url"], output_dir, self.track_list[index]) def download_all_tracks(self, output_dir: str) -> bool: if not self.track_list: self.failed.emit("empty_download_list") return False if not output_dir or not Path(output_dir).exists(): self.failed.emit("invalid_output_folder") return False if self.download_worker and self.download_worker.isRunning(): self.failed.emit("download_in_progress") return False self.download_queue = list(self.track_list) self.queue_output_dir = output_dir self._download_next_in_queue() return True def download_audio(self, url: str, output_dir: str, track_metadata: Optional[dict] = None) -> bool: if not self.is_valid_youtube_url(url): self.failed.emit("invalid_youtube_url") return False if not output_dir or not Path(output_dir).exists(): self.failed.emit("invalid_output_folder") return False if self.download_worker and self.download_worker.isRunning(): self.failed.emit("download_in_progress") return False audio_format = self.settings_manager.get_audio_format() audio_quality = self.settings_manager.get_audio_quality() ffmpeg_location = self.resolve_ffmpeg_location() self.download_worker = AudioDownloadWorker(url, output_dir, audio_format, audio_quality, ffmpeg_location, track_metadata) self.download_worker.progress_text.connect(self.progress_text.emit) self.download_worker.progress_value.connect(self.progress_value.emit) self.download_worker.completed.connect(self._on_worker_completed) self.download_worker.failed.connect(self._on_worker_failed) self.started.emit() self.downloading_state_changed.emit(True) self.download_worker.start() return True def _download_next_in_queue(self) -> None: if not self.download_queue: self.queue_output_dir = "" self.downloading_state_changed.emit(False) self.queue_finished.emit() return next_track = self.download_queue.pop(0) self.download_audio(next_track["url"], self.queue_output_dir, next_track) def _on_worker_completed(self, title: str) -> None: self.completed.emit(title) if self.download_queue: self._download_next_in_queue() return self.downloading_state_changed.emit(False) def _on_worker_failed(self, error_text: str) -> None: self.download_queue = [] self.queue_output_dir = "" self.downloading_state_changed.emit(False) if "ffmpeg" in error_text.lower(): self.failed.emit("ffmpeg_required") else: self.failed.emit("audio_download_error") def is_valid_youtube_url(self, url: str) -> bool: if not url.startswith(("http://", "https://")): return False return "youtube.com" in url or "youtu.be" in url def _extract_title_from_url(self, url: str) -> Optional[str]: try: ydl_opts = { "quiet": True, "no_warnings": True, } with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url, download=False) return info.get("title") if isinstance(info, dict) else None except Exception: return None def _extract_playlist_tracks(self, playlist_url: str) -> list[dict[str, str]]: # Try with extract_flat first, fallback to full extraction if it fails for flat in (True, False): try: ydl_opts = { "quiet": True, "no_warnings": True, } if flat: ydl_opts["extract_flat"] = True with YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(playlist_url, download=False) entries = info.get("entries", []) if isinstance(info, dict) else [] tracks: list[dict[str, str]] = [] for entry in entries: if not isinstance(entry, dict): continue video_id = entry.get("id") title = entry.get("title") or "Untitled" url = entry.get("url") if video_id: video_url = f"https://www.youtube.com/watch?v={video_id}" elif isinstance(url, str) and url.startswith("http"): video_url = url elif isinstance(url, str): video_url = f"https://www.youtube.com/watch?v={url}" else: continue tracks.append(self._build_track_payload(title=title, url=video_url)) if tracks: return tracks except Exception as e: print(f"[yt-dlp playlist extraction error] flat={flat}: {e}") return [] def resolve_ffmpeg_location(self) -> Optional[str]: candidates = [ Path("others") / "ffmpeg.exe", Path("data") / "others" / "ffmpeg.exe", Path(paths.resource_path("others/ffmpeg.exe")), Path(paths.resource_path("data/others/ffmpeg.exe")), ] for candidate in candidates: if candidate.exists(): return str(candidate.parent) return None