YoutubeDownloader/app/core/download_manager.py
2026-03-18 16:35:14 +01:00

729 lines
28 KiB
Python

from pathlib import Path
import re
from urllib.parse import quote_plus
from typing import Optional
from PyQt6.QtCore import QObject, QThread, pyqtSignal
from mutagen.id3 import APIC, ID3, TALB, TIT2, TPE1, delete as id3_delete
from yt_dlp import YoutubeDL
import app.utils.paths as paths
from app.core.settings_manager import SettingsManager
class AudioDownloadWorker(QThread):
progress_text = pyqtSignal(str)
progress_value = pyqtSignal(int)
completed = pyqtSignal(str)
failed = pyqtSignal(str)
def __init__(self, url: str, output_dir: str, audio_format: str, audio_quality: str, ffmpeg_location: Optional[str] = None, track_metadata: Optional[dict] = None) -> None:
super().__init__()
self.url = url
self.output_dir = output_dir
self.audio_format = audio_format
self.audio_quality = audio_quality
self.ffmpeg_location = ffmpeg_location
self.track_metadata = track_metadata if isinstance(track_metadata, dict) else {}
def _progress_hook(self, data: dict) -> None:
status = data.get("status")
if status == "downloading":
percent_text = str(data.get("_percent_str", "")).strip()
percent_text = re.sub(r"\x1b\[[0-9;]*m", "", percent_text)
percent_match = re.search(r"(\d+(?:\.\d+)?)%", percent_text)
if percent_match:
value = int(float(percent_match.group(1)))
self.progress_value.emit(max(0, min(100, value)))
self.progress_text.emit(percent_text or "Downloading...")
elif status == "finished":
self.progress_value.emit(100)
def run(self) -> None:
try:
ydl_opts = {
"format": "bestaudio/best",
"noplaylist": True,
"outtmpl": str(Path(self.output_dir) / "%(title)s.%(ext)s"),
"progress_hooks": [self._progress_hook],
"quiet": True,
"no_warnings": True,
}
if self.ffmpeg_location:
ydl_opts["ffmpeg_location"] = self.ffmpeg_location
if self.audio_format == "mp3":
ydl_opts.update(
{
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
"preferredquality": self.audio_quality,
}
]
}
)
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(self.url, download=True)
title = info.get("title", "audio")
self.progress_text.emit(title)
if self.audio_format == "mp3":
downloaded_file = self._resolve_downloaded_file(info)
if downloaded_file:
self._apply_mp3_tags(downloaded_file, title)
self.completed.emit(title)
except Exception as error:
self.failed.emit(str(error))
def _resolve_downloaded_file(self, info: dict) -> Optional[Path]:
candidates: list[Path] = []
requested_downloads = info.get("requested_downloads") if isinstance(info, dict) else None
if isinstance(requested_downloads, list):
for download_data in requested_downloads:
if not isinstance(download_data, dict):
continue
for key in ("filepath", "_filename"):
candidate_path = download_data.get(key)
if isinstance(candidate_path, str) and candidate_path:
candidates.append(Path(candidate_path))
for key in ("filepath", "_filename"):
candidate_path = info.get(key) if isinstance(info, dict) else None
if isinstance(candidate_path, str) and candidate_path:
candidates.append(Path(candidate_path))
for candidate in candidates:
if candidate.exists() and candidate.is_file() and candidate.suffix.lower() == ".mp3":
return candidate
mp3_candidate = candidate.with_suffix(".mp3")
if mp3_candidate.exists() and mp3_candidate.is_file():
return mp3_candidate
output_dir = Path(self.output_dir)
if output_dir.exists():
mp3_files = sorted(output_dir.glob("*.mp3"), key=lambda file_path: file_path.stat().st_mtime, reverse=True)
if mp3_files:
return mp3_files[0]
return None
def _apply_mp3_tags(self, file_path: Path, fallback_title: str) -> None:
metadata = self.track_metadata
track_title = str(metadata.get("mp3_title") or metadata.get("title") or fallback_title).strip()
track_artist = str(metadata.get("artist", "")).strip()
track_album = str(metadata.get("album", "")).strip()
track_cover = str(metadata.get("cover_path", "")).strip()
try:
id3_delete(str(file_path))
except Exception:
pass
tags = ID3()
tags.delall("TIT2")
tags.add(TIT2(encoding=3, text=track_title or fallback_title))
tags.delall("TPE1")
if track_artist:
tags.add(TPE1(encoding=3, text=track_artist))
tags.delall("TALB")
if track_album:
tags.add(TALB(encoding=3, text=track_album))
tags.delall("APIC")
if track_cover:
cover_file = Path(track_cover)
if cover_file.exists() and cover_file.is_file():
mime_type = "image/jpeg"
cover_data = None
if cover_file.suffix.lower() in (".png", ".webp"):
try:
from PIL import Image
import io
img = Image.open(cover_file)
buf = io.BytesIO()
img.convert("RGB").save(buf, format="JPEG")
cover_data = buf.getvalue()
mime_type = "image/jpeg"
except Exception:
with open(cover_file, "rb") as stream:
cover_data = stream.read()
mime_type = "image/png" if cover_file.suffix.lower() == ".png" else "image/webp"
else:
with open(cover_file, "rb") as stream:
cover_data = stream.read()
if cover_data:
tags.add(APIC(encoding=3, mime=mime_type, type=3, desc="Cover", data=cover_data))
tags.save(str(file_path), v2_version=4)
renamed_file = self._rename_output_file(file_path, track_title or fallback_title, track_artist)
if renamed_file is not None:
self.progress_text.emit(renamed_file.stem)
def _sanitize_filename_part(self, value: str) -> str:
clean_value = re.sub(r"[<>:\"/\\|?*]", "_", value).strip()
clean_value = re.sub(r"\s+", " ", clean_value)
return clean_value.strip(". ")
def _rename_output_file(self, file_path: Path, title: str, artist: str) -> Optional[Path]:
safe_title = self._sanitize_filename_part(title)
safe_artist = self._sanitize_filename_part(artist)
if safe_artist and safe_title:
base_name = f"{safe_artist} - {safe_title}"
else:
base_name = safe_title or self._sanitize_filename_part(file_path.stem)
if not base_name:
return None
target_path = file_path.with_name(f"{base_name}{file_path.suffix}")
if target_path == file_path:
return file_path
counter = 2
while target_path.exists():
target_path = file_path.with_name(f"{base_name} ({counter}){file_path.suffix}")
counter += 1
try:
file_path.rename(target_path)
return target_path
except Exception:
return None
class YoutubeSearchWorker(QThread):
finished_search = pyqtSignal(list)
failed_search = pyqtSignal(str)
def __init__(self, query: str) -> None:
super().__init__()
self.query = query
def run(self) -> None:
try:
results: list[dict] = []
seen_urls: set[str] = set()
ydl_opts = {"quiet": True, "no_warnings": True, "extract_flat": True}
with YoutubeDL(ydl_opts) as ydl:
video_result = ydl.extract_info(f"ytsearch10:{self.query}", download=False)
playlist_url = f"https://www.youtube.com/results?search_query={quote_plus(self.query)}&sp=EgIQAw%253D%253D"
playlist_result = ydl.extract_info(playlist_url, download=False)
video_entries = video_result.get("entries", []) if isinstance(video_result, dict) else []
for entry in video_entries:
item = self._normalize_video_entry(entry)
if item and item["url"] not in seen_urls:
seen_urls.add(item["url"])
results.append(item)
playlist_entries = playlist_result.get("entries", []) if isinstance(playlist_result, dict) else []
for entry in playlist_entries:
item = self._normalize_playlist_entry(entry)
if item and item["url"] not in seen_urls:
seen_urls.add(item["url"])
results.append(item)
self.finished_search.emit(results)
except Exception:
self.failed_search.emit("search_failed")
def _normalize_video_entry(self, entry: dict) -> Optional[dict]:
if not isinstance(entry, dict):
return None
video_id = entry.get("id")
if not video_id:
return None
title = entry.get("title") or "Untitled"
channel = entry.get("uploader") or entry.get("channel") or ""
duration = self._format_duration(entry.get("duration"))
thumbnail_url = f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
return {
"kind": "video",
"title": title,
"url": f"https://www.youtube.com/watch?v={video_id}",
"channel": channel,
"duration": duration,
"item_count": "",
"thumbnail_url": thumbnail_url,
"thumbnail_bytes": b"",
}
def _normalize_playlist_entry(self, entry: dict) -> Optional[dict]:
if not isinstance(entry, dict):
return None
playlist_id = entry.get("id")
if not playlist_id:
return None
title = entry.get("title") or "Untitled playlist"
channel = entry.get("uploader") or entry.get("channel") or ""
count = entry.get("playlist_count") or entry.get("n_entries") or 0
thumbnails = entry.get("thumbnails") or []
thumbnail_url = ""
if thumbnails and isinstance(thumbnails, list):
thumbnail_url = (thumbnails[-1] or {}).get("url", "")
if not thumbnail_url:
thumbnail_url = f"https://i.ytimg.com/vi/{playlist_id}/hqdefault.jpg"
return {
"kind": "playlist",
"title": title,
"url": f"https://www.youtube.com/playlist?list={playlist_id}",
"channel": channel,
"duration": "",
"item_count": str(count) if count else "",
"thumbnail_url": thumbnail_url,
"thumbnail_bytes": b"",
}
def _format_duration(self, seconds: Optional[int]) -> str:
if not isinstance(seconds, int) or seconds < 0:
return ""
minutes, sec = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours}:{minutes:02}:{sec:02}"
return f"{minutes}:{sec:02}"
class PreviewStreamWorker(QThread):
finished_preview = pyqtSignal(str, str)
failed_preview = pyqtSignal(str)
def __init__(self, url: str, title: str) -> None:
super().__init__()
self.url = url
self.title = title
def run(self) -> None:
try:
ydl_opts = {
"quiet": True,
"no_warnings": True,
"format": "best[height<=720][vcodec!=none][acodec!=none]/best[ext=mp4][height<=720]/best",
"noplaylist": True,
}
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(self.url, download=False)
stream_url = info.get("url") if isinstance(info, dict) else None
if stream_url:
self.finished_preview.emit(stream_url, self.title)
else:
self.failed_preview.emit("preview_unavailable")
except Exception:
self.failed_preview.emit("preview_unavailable")
class DownloadManager(QObject):
def apply_album_cover(self, album_name: str, cover_path: str) -> None:
"""Applique la cover à tous les morceaux de l'album (business logic)"""
for idx, track in enumerate(self.track_list):
if str(track.get("album", "")).strip() == album_name:
self.update_track_metadata(
idx,
str(track.get("mp3_title") or track.get("title", "")),
str(track.get("artist", "")),
album_name,
cover_path,
)
started = pyqtSignal()
progress_text = pyqtSignal(str)
progress_value = pyqtSignal(int)
completed = pyqtSignal(str)
failed = pyqtSignal(str)
list_changed = pyqtSignal(list)
queue_finished = pyqtSignal()
downloading_state_changed = pyqtSignal(bool)
search_started = pyqtSignal()
search_results = pyqtSignal(list)
preview_started = pyqtSignal()
preview_ready = pyqtSignal(str, str)
preview_failed = pyqtSignal(str)
def __init__(self, settings_manager: SettingsManager) -> None:
super().__init__()
self.settings_manager = settings_manager
self.download_worker: Optional[AudioDownloadWorker] = None
self.search_worker: Optional[YoutubeSearchWorker] = None
self.preview_worker: Optional[PreviewStreamWorker] = None
self.track_list: list[dict[str, str]] = []
self.download_queue: list[dict[str, str]] = []
self.queue_output_dir: str = ""
self.last_playlist_added_count: int = 0
def get_tracks(self) -> list[dict[str, str]]:
return list(self.track_list)
def _build_track_payload(self, title: str, url: str, artist: str = "", album: str = "", cover_path: str = "") -> dict[str, str]:
clean_title = title.strip()
clean_url = url.strip()
return {
"title": clean_title,
"url": clean_url,
"mp3_title": clean_title,
"artist": artist.strip(),
"album": album.strip(),
"cover_path": cover_path.strip(),
}
def add_track(self, url: str) -> bool:
clean_url = url.strip()
if not self.is_valid_youtube_url(clean_url):
self.failed.emit("invalid_youtube_url")
return False
title = self._extract_title_from_url(clean_url)
if not title:
self.failed.emit("metadata_fetch_failed")
return False
self.track_list.append(self._build_track_payload(title=title, url=clean_url))
self.list_changed.emit(self.get_tracks())
return True
def add_search_result(self, search_result: dict) -> bool:
self.last_playlist_added_count = 0
kind = search_result.get("kind", "video")
url = search_result.get("url", "")
title = search_result.get("title", "")
if kind == "playlist":
tracks = self._extract_playlist_tracks(url)
if not tracks:
self.failed.emit("playlist_expand_failed")
return False
self.track_list.extend(tracks)
self.last_playlist_added_count = len(tracks)
self.list_changed.emit(self.get_tracks())
return True
return self.add_track_with_title(url, title)
def add_track_with_title(self, url: str, title: str) -> bool:
clean_url = url.strip()
clean_title = title.strip()
if not self.is_valid_youtube_url(clean_url):
self.failed.emit("invalid_youtube_url")
return False
if not clean_title:
self.failed.emit("metadata_fetch_failed")
return False
self.track_list.append(self._build_track_payload(title=clean_title, url=clean_url))
self.list_changed.emit(self.get_tracks())
return True
def update_track_metadata(self, index: int, mp3_title: str, artist: str, album: str, cover_path: str) -> bool:
if index < 0 or index >= len(self.track_list):
self.failed.emit("no_track_selected")
return False
track = self.track_list[index]
clean_mp3_title = mp3_title.strip() or track.get("title", "")
clean_artist = artist.strip()
clean_album = album.strip()
clean_cover = cover_path.strip()
# Update in-memory metadata
track["mp3_title"] = clean_mp3_title
track["artist"] = clean_artist
track["album"] = clean_album
track["cover_path"] = clean_cover
# Try to find the file path (assume output dir is known, search for file)
# Use the current filename or try to reconstruct it
output_dir = self.settings_manager.get_config("output_dir")
if not output_dir:
output_dir = "."
output_dir = Path(output_dir)
# Try to find the file by matching artist/title or fallback to most recent
found_file = None
for f in output_dir.glob("*.mp3"):
# Try to match by old artist/title
if f.stem == f"{track.get('artist', '')} - {track.get('mp3_title', '')}":
found_file = f
break
if not found_file:
# fallback: most recent
mp3_files = sorted(output_dir.glob("*.mp3"), key=lambda file_path: file_path.stat().st_mtime, reverse=True)
if mp3_files:
found_file = mp3_files[0]
if found_file and found_file.exists():
# Update ID3 tags
try:
id3_delete(str(found_file))
except Exception:
pass
tags = ID3()
tags.delall("TIT2")
tags.add(TIT2(encoding=3, text=clean_mp3_title or track.get("title", "")))
tags.delall("TPE1")
if clean_artist:
tags.add(TPE1(encoding=3, text=clean_artist))
tags.delall("TALB")
if clean_album:
tags.add(TALB(encoding=3, text=clean_album))
tags.delall("APIC")
if clean_cover:
cover_file = Path(clean_cover)
if cover_file.exists() and cover_file.is_file():
mime_type = "image/jpeg"
if cover_file.suffix.lower() == ".png":
mime_type = "image/png"
elif cover_file.suffix.lower() == ".webp":
mime_type = "image/webp"
with open(cover_file, "rb") as stream:
cover_data = stream.read()
if cover_data:
tags.add(APIC(encoding=3, mime=mime_type, type=3, desc="Cover", data=cover_data))
tags.save(str(found_file), v2_version=4)
# Always rename file to 'artist - title.mp3'
safe_title = self._sanitize_filename_part(clean_mp3_title)
safe_artist = self._sanitize_filename_part(clean_artist)
if safe_artist and safe_title:
base_name = f"{safe_artist} - {safe_title}"
else:
base_name = safe_title or self._sanitize_filename_part(found_file.stem)
target_path = found_file.with_name(f"{base_name}{found_file.suffix}")
if target_path != found_file:
counter = 2
while target_path.exists():
target_path = found_file.with_name(f"{base_name} ({counter}){found_file.suffix}")
counter += 1
try:
found_file.rename(target_path)
except Exception:
pass
self.list_changed.emit(self.get_tracks())
return True
def search_tracks(self, query: str) -> bool:
clean_query = query.strip()
if len(clean_query) < 2:
self.failed.emit("invalid_search_query")
return False
if self.search_worker and self.search_worker.isRunning():
self.failed.emit("search_in_progress")
return False
self.search_worker = YoutubeSearchWorker(clean_query)
self.search_worker.finished_search.connect(self.search_results.emit)
self.search_worker.failed_search.connect(self.failed.emit)
self.search_started.emit()
self.search_worker.start()
return True
def start_preview(self, search_result: dict) -> bool:
url = search_result.get("url", "")
title = search_result.get("title", "")
kind = search_result.get("kind", "video")
if kind != "video":
self.preview_failed.emit("preview_playlist_unavailable")
return False
if not self.is_valid_youtube_url(url):
self.preview_failed.emit("invalid_youtube_url")
return False
if self.preview_worker and self.preview_worker.isRunning():
self.preview_failed.emit("preview_in_progress")
return False
self.preview_worker = PreviewStreamWorker(url, title)
self.preview_worker.finished_preview.connect(self.preview_ready.emit)
self.preview_worker.failed_preview.connect(self.preview_failed.emit)
self.preview_started.emit()
self.preview_worker.start()
return True
def remove_track(self, index: int) -> bool:
if index < 0 or index >= len(self.track_list):
self.failed.emit("no_track_selected")
return False
self.track_list.pop(index)
self.list_changed.emit(self.get_tracks())
return True
def clear_tracks(self) -> None:
self.track_list.clear()
self.list_changed.emit(self.get_tracks())
def download_track_at(self, index: int, output_dir: str) -> bool:
if index < 0 or index >= len(self.track_list):
self.failed.emit("no_track_selected")
return False
return self.download_audio(self.track_list[index]["url"], output_dir, self.track_list[index])
def download_all_tracks(self, output_dir: str) -> bool:
if not self.track_list:
self.failed.emit("empty_download_list")
return False
if not output_dir or not Path(output_dir).exists():
self.failed.emit("invalid_output_folder")
return False
if self.download_worker and self.download_worker.isRunning():
self.failed.emit("download_in_progress")
return False
self.download_queue = list(self.track_list)
self.queue_output_dir = output_dir
self._download_next_in_queue()
return True
def download_audio(self, url: str, output_dir: str, track_metadata: Optional[dict] = None) -> bool:
if not self.is_valid_youtube_url(url):
self.failed.emit("invalid_youtube_url")
return False
if not output_dir or not Path(output_dir).exists():
self.failed.emit("invalid_output_folder")
return False
if self.download_worker and self.download_worker.isRunning():
self.failed.emit("download_in_progress")
return False
audio_format = self.settings_manager.get_audio_format()
audio_quality = self.settings_manager.get_audio_quality()
ffmpeg_location = self.resolve_ffmpeg_location()
self.download_worker = AudioDownloadWorker(url, output_dir, audio_format, audio_quality, ffmpeg_location, track_metadata)
self.download_worker.progress_text.connect(self.progress_text.emit)
self.download_worker.progress_value.connect(self.progress_value.emit)
self.download_worker.completed.connect(self._on_worker_completed)
self.download_worker.failed.connect(self._on_worker_failed)
self.started.emit()
self.downloading_state_changed.emit(True)
self.download_worker.start()
return True
def _download_next_in_queue(self) -> None:
if not self.download_queue:
self.queue_output_dir = ""
self.downloading_state_changed.emit(False)
self.queue_finished.emit()
return
next_track = self.download_queue.pop(0)
self.download_audio(next_track["url"], self.queue_output_dir, next_track)
def _on_worker_completed(self, title: str) -> None:
self.completed.emit(title)
if self.download_queue:
self._download_next_in_queue()
return
self.downloading_state_changed.emit(False)
def _on_worker_failed(self, error_text: str) -> None:
self.download_queue = []
self.queue_output_dir = ""
self.downloading_state_changed.emit(False)
if "ffmpeg" in error_text.lower():
self.failed.emit("ffmpeg_required")
else:
self.failed.emit("audio_download_error")
def is_valid_youtube_url(self, url: str) -> bool:
if not url.startswith(("http://", "https://")):
return False
return "youtube.com" in url or "youtu.be" in url
def _extract_title_from_url(self, url: str) -> Optional[str]:
try:
ydl_opts = {
"quiet": True,
"no_warnings": True,
}
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
return info.get("title") if isinstance(info, dict) else None
except Exception:
return None
def _extract_playlist_tracks(self, playlist_url: str) -> list[dict[str, str]]:
# Try with extract_flat first, fallback to full extraction if it fails
for flat in (True, False):
try:
ydl_opts = {
"quiet": True,
"no_warnings": True,
}
if flat:
ydl_opts["extract_flat"] = True
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(playlist_url, download=False)
entries = info.get("entries", []) if isinstance(info, dict) else []
tracks: list[dict[str, str]] = []
for entry in entries:
if not isinstance(entry, dict):
continue
video_id = entry.get("id")
title = entry.get("title") or "Untitled"
url = entry.get("url")
if video_id:
video_url = f"https://www.youtube.com/watch?v={video_id}"
elif isinstance(url, str) and url.startswith("http"):
video_url = url
elif isinstance(url, str):
video_url = f"https://www.youtube.com/watch?v={url}"
else:
continue
tracks.append(self._build_track_payload(title=title, url=video_url))
if tracks:
return tracks
except Exception as e:
print(f"[yt-dlp playlist extraction error] flat={flat}: {e}")
return []
def resolve_ffmpeg_location(self) -> Optional[str]:
candidates = [
Path("others") / "ffmpeg.exe",
Path("data") / "others" / "ffmpeg.exe",
Path(paths.resource_path("others/ffmpeg.exe")),
Path(paths.resource_path("data/others/ffmpeg.exe")),
]
for candidate in candidates:
if candidate.exists():
return str(candidate.parent)
return None