From 79ba8deb716bc461943b8038d4a71a80f65d9fbc Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 14 Jul 2024 03:06:05 +0200 Subject: [PATCH] [media] Added support for yt-dlp-compatible URLs to `media.download`. Also, added `MediaDownloadEvent`s to keep track of the state of the download. --- platypush/message/event/media.py | 66 ++++++- platypush/plugins/media/__init__.py | 269 ++++++++++++++++++++++++++-- setup.cfg | 1 + 3 files changed, 315 insertions(+), 21 deletions(-) diff --git a/platypush/message/event/media.py b/platypush/message/event/media.py index f7f505e792..0eb046ac11 100644 --- a/platypush/message/event/media.py +++ b/platypush/message/event/media.py @@ -1,11 +1,12 @@ +from abc import ABC from platypush.message.event import Event class MediaEvent(Event): """Base class for media events""" - def __init__(self, player=None, plugin=None, status=None, *args, **kwargs): - super().__init__(player=player, plugin=plugin, status=status, *args, **kwargs) + def __init__(self, *args, player=None, plugin=None, status=None, **kwargs): + super().__init__(*args, player=player, plugin=plugin, status=status, **kwargs) class MediaPlayRequestEvent(MediaEvent): @@ -126,4 +127,65 @@ class NewPlayingMediaEvent(MediaEvent): ) +class MediaDownloadEvent(MediaEvent, ABC): + """ + Base class for media download events. + """ + + def __init__( + self, *args, player=None, plugin=None, resource=None, target=None, **kwargs + ): + """ + :param resource: File name or URI of the downloaded resource + :type resource: str + :param target: Target file name or URI of the downloaded resource + :type target: str + """ + + super().__init__( + *args, + player=player, + plugin=plugin, + resource=resource, + target=target, + **kwargs + ) + + +class MediaDownloadStartedEvent(MediaDownloadEvent): + """ + Event triggered when a media download is started. + """ + + +class MediaDownloadProgressEvent(MediaDownloadEvent): + """ + Event triggered when a media download is in progress. + """ + + def __init__(self, progress: float, *args, **kwargs): + """ + :param progress: Download progress in percentage, between 0 and 100. + """ + super().__init__(*args, progress=progress, **kwargs) + + +class MediaDownloadCompletedEvent(MediaDownloadEvent): + """ + Event triggered when a media download is completed. + """ + + +class MediaDownloadErrorEvent(MediaDownloadEvent): + """ + Event triggered when a media download fails. + """ + + def __init__(self, error: str, *args, **kwargs): + """ + :param error: Error message. + """ + super().__init__(*args, error=error, **kwargs) + + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/__init__.py b/platypush/plugins/media/__init__.py index 18106ae94e..228ddfa2ad 100644 --- a/platypush/plugins/media/__init__.py +++ b/platypush/plugins/media/__init__.py @@ -1,3 +1,4 @@ +from contextlib import suppress from dataclasses import dataclass import enum import functools @@ -9,15 +10,34 @@ import re import subprocess import tempfile import threading +import time from abc import ABC, abstractmethod -from typing import Iterable, Optional, List, Dict, Union +from typing import ( + Callable, + Dict, + IO, + Iterable, + List, + Optional, + Tuple, + Type, + Union, +) import requests from platypush.config import Config from platypush.context import get_plugin, get_backend -from platypush.plugins import Plugin, action -from platypush.utils import get_default_downloads_dir +from platypush.message.event.media import ( + MediaDownloadCompletedEvent, + MediaDownloadErrorEvent, + MediaDownloadEvent, + MediaDownloadProgressEvent, + MediaDownloadStartedEvent, + MediaEvent, +) +from platypush.plugins import RunnablePlugin, action +from platypush.utils import get_default_downloads_dir, get_plugin_name_by_class class PlayerState(enum.Enum): @@ -50,7 +70,7 @@ class MediaResource: resolution: Optional[str] = None -class MediaPlugin(Plugin, ABC): +class MediaPlugin(RunnablePlugin, ABC): """ Generic plugin to interact with a media player. @@ -170,7 +190,7 @@ class MediaPlugin(Plugin, ABC): env: Optional[Dict[str, str]] = None, volume: Optional[Union[float, int]] = None, torrent_plugin: str = 'torrent', - youtube_format: Optional[str] = 'best[height<=?1080][ext=mp4]', + youtube_format: Optional[str] = 'bv[height<=?1080][ext=mp4]+ba', youtube_dl: str = 'yt-dlp', **kwargs, ): @@ -213,6 +233,7 @@ class MediaPlugin(Plugin, ABC): media_dirs = [] player = None player_config = {} + self._download_threads: Dict[Tuple[str, str], threading.Thread] = {} if self.__class__.__name__ == 'MediaPlugin': # Abstract class, initialize with the default configured player @@ -399,8 +420,8 @@ class MediaPlugin(Plugin, ABC): @action @abstractmethod - def stop(self, **kwargs): - raise self._NOT_IMPLEMENTED_ERR + def stop(self, *args, **kwargs): + super().stop() @action @abstractmethod @@ -683,7 +704,15 @@ class MediaPlugin(Plugin, ABC): output = ytdl.communicate()[0].decode().strip() ytdl.wait() - stream_url, info = output.split('\n') + self.logger.debug('yt-dlp output: %s', output) + lines = output.split('\n') + + if not lines: + self.logger.warning('No output from yt-dlp') + return None + + stream_url = lines[1] if len(lines) > 2 else lines[0] + info = lines[-1] return { **json.loads(info), 'url': stream_url, @@ -772,31 +801,230 @@ class MediaPlugin(Plugin, ABC): @action def download( - self, url: str, filename: Optional[str] = None, directory: Optional[str] = None + self, + url: str, + filename: Optional[str] = None, + directory: Optional[str] = None, + timeout: int = 10, + youtube_format: Optional[str] = None, ): """ - Download a media URL to a local file on the Platypush host. + Download a media URL to a local file on the Platypush host (yt-dlp + required for YouTube URLs). + + This action is non-blocking and returns the path to the downloaded file + once the download is initiated. + + You can then subscribe to these events to monitor the download progress: + + - :class:`platypush.message.event.media.MediaDownloadStartedEvent` + - :class:`platypush.message.event.media.MediaDownloadProgressEvent` + - :class:`platypush.message.event.media.MediaDownloadErrorEvent` + - :class:`platypush.message.event.media.MediaDownloadFinishedEvent` :param url: Media URL. :param filename: Media filename (default: inferred from the URL basename). :param directory: Destination directory (default: ``download_dir``). + :param timeout: Network timeout in seconds (default: 10). + :param youtube: Set to True if the URL is a YouTube video, or any other + URL compatible with yt-dlp. + :param youtube_format: Override the default YouTube format selection. :return: The absolute path to the downloaded file. """ + path = self._get_download_path( + url, directory=directory, filename=filename, youtube_format=youtube_format + ) - if not filename: - filename = url.split('/')[-1] + if self._is_youtube_resource(url): + self._download_youtube_url( + url, path, timeout=timeout, youtube_format=youtube_format + ) + else: + self._download_url(url, path, timeout=timeout) + + return path + + def _get_download_path( + self, + url: str, + directory: Optional[str] = None, + filename: Optional[str] = None, + youtube_format: Optional[str] = None, + ) -> str: if not directory: directory = self.download_dir - path = os.path.join(directory, filename) + directory = os.path.expanduser(directory) + youtube_format = youtube_format or self.youtube_format - with requests.get(url, timeout=20, stream=True) as r: - r.raise_for_status() - with open(path, 'wb') as f: - for chunk in r.iter_content(chunk_size=8192): - f.write(chunk) + if self._is_youtube_resource(url): + with subprocess.Popen( + [ + self._ytdl, + *( + [ + '-f', + youtube_format, + ] + if youtube_format + else [] + ), + '-O', + '%(title)s.%(ext)s', + url, + ], + stdout=subprocess.PIPE, + ) as proc: + assert proc.stdout, 'yt-dlp stdout is None' + filename = proc.stdout.read().decode()[:-1] - return path + if not filename: + filename = url.split('/')[-1] + + return os.path.join(directory, filename) + + def _download_url(self, url: str, path: str, timeout: int): + r = requests.get(url, timeout=timeout, stream=True) + r.raise_for_status() + download_thread = threading.Thread( + target=self._download_url_thread, + args=(r, open(path, 'wb')), # pylint: disable=consider-using-with + ) + + download_thread.start() + self._download_threads[url, path] = download_thread + + def _download_youtube_url( + self, url: str, path: str, timeout: int, youtube_format: Optional[str] = None + ): + ytdl_cmd = [ + self._ytdl, + *( + ['-f', youtube_format or self.youtube_format] + if youtube_format or self.youtube_format + else [] + ), + url, + '-o', + '-', + ] + + self.logger.info('Executing command %r', ytdl_cmd) + download_thread = threading.Thread( + target=self._download_youtube_url_thread, + args=( + subprocess.Popen( # pylint: disable=consider-using-with + ytdl_cmd, stdout=subprocess.PIPE + ), + open(path, 'wb'), # pylint: disable=consider-using-with + url, + ), + kwargs={'timeout': timeout}, + ) + + download_thread.start() + self._download_threads[url, path] = download_thread + + def _download_url_thread(self, response: requests.Response, f: IO): + def on_close(): + with suppress(IOError, OSError, requests.exceptions.RequestException): + response.close() + + size = int(response.headers.get('Content-Length', 0)) or None + self._download_thread_wrapper( + iterator=lambda: response.iter_content(chunk_size=8192), + f=f, + url=response.url, + size=size, + on_close=on_close, + ) + + def _download_youtube_url_thread( + self, proc: subprocess.Popen, f: IO, url: str, timeout: int + ): + def read(): + if not proc.stdout: + return b'' + + return proc.stdout.read(8192) + + def on_close(): + with suppress(IOError, OSError): + proc.terminate() + proc.wait(timeout=5) + if proc.returncode is None: + proc.kill() + + proc_start = time.time() + + while not proc.stdout: + if time.time() - proc_start > timeout: + self.logger.warning('yt-dlp process timed out') + on_close() + return + + self.wait_stop(1) + + self._download_thread_wrapper( + iterator=lambda: iter(read, b''), + f=f, + url=url, + size=None, + on_close=on_close, + ) + + def _download_thread_wrapper( + self, + iterator: Callable[[], Iterable[bytes]], + f: IO, + url: str, + size: Optional[int], + on_close: Callable[[], None] = lambda: None, + ): + def post_event(event_type: Type[MediaDownloadEvent], **kwargs): + self._post_event(event_type, resource=url, path=f.name, **kwargs) + + if (url, f.name) in self._download_threads: + self.logger.warning( + 'A download of %s to %s is already in progress', url, f.name + ) + return + + interrupted = False + + try: + self._post_event(MediaDownloadStartedEvent, resource=url, path=f.name) + last_percent = 0 + + for chunk in iterator(): + if not chunk or self.should_stop(): + interrupted = self.should_stop() + break + + f.write(chunk) + percent = f.tell() / size * 100 if size else 0 + if percent and percent - last_percent > 1: + post_event(MediaDownloadProgressEvent, progress=percent) + last_percent = percent + + if not interrupted: + post_event(MediaDownloadCompletedEvent) + except Exception as e: + self.logger.warning('Error while downloading URL: %s', e) + post_event(MediaDownloadErrorEvent, error=str(e)) + finally: + on_close() + + with suppress(IOError, OSError): + f.close() + + self._download_threads.pop((url, f.name), None) + + def _post_event(self, event_type: Type[MediaEvent], **kwargs): + evt = event_type( + player=get_plugin_name_by_class(self.__class__), plugin=self, **kwargs + ) + self._bus.post(evt) def is_local(self): return self._is_local @@ -820,5 +1048,8 @@ class MediaPlugin(Plugin, ABC): f.write(content) return f.name + def main(self): + self.wait_stop() + # vim:sw=4:ts=4:et: diff --git a/setup.cfg b/setup.cfg index ae01bbb467..d303208aea 100644 --- a/setup.cfg +++ b/setup.cfg @@ -23,3 +23,4 @@ extend-ignore = W503 SIM104 SIM105 + SIM115