[media] Added support for yt-dlp-compatible URLs to media.download.

Also, added `MediaDownloadEvent`s to keep track of the state of the
download.
This commit is contained in:
Fabio Manganiello 2024-07-14 03:06:05 +02:00
parent 16527417da
commit 96aa22c03e
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
3 changed files with 315 additions and 21 deletions

View file

@ -1,11 +1,12 @@
from abc import ABC
from platypush.message.event import Event from platypush.message.event import Event
class MediaEvent(Event): class MediaEvent(Event):
"""Base class for media events""" """Base class for media events"""
def __init__(self, player=None, plugin=None, status=None, *args, **kwargs): def __init__(self, *args, player=None, plugin=None, status=None, **kwargs):
super().__init__(player=player, plugin=plugin, status=status, *args, **kwargs) super().__init__(*args, player=player, plugin=plugin, status=status, **kwargs)
class MediaPlayRequestEvent(MediaEvent): class MediaPlayRequestEvent(MediaEvent):
@ -126,4 +127,65 @@ class NewPlayingMediaEvent(MediaEvent):
) )
class MediaDownloadEvent(MediaEvent, ABC):
"""
Base class for media download events.
"""
def __init__(
self, *args, player=None, plugin=None, resource=None, target=None, **kwargs
):
"""
:param resource: File name or URI of the downloaded resource
:type resource: str
:param target: Target file name or URI of the downloaded resource
:type target: str
"""
super().__init__(
*args,
player=player,
plugin=plugin,
resource=resource,
target=target,
**kwargs
)
class MediaDownloadStartedEvent(MediaDownloadEvent):
"""
Event triggered when a media download is started.
"""
class MediaDownloadProgressEvent(MediaDownloadEvent):
"""
Event triggered when a media download is in progress.
"""
def __init__(self, progress: float, *args, **kwargs):
"""
:param progress: Download progress in percentage, between 0 and 100.
"""
super().__init__(*args, progress=progress, **kwargs)
class MediaDownloadCompletedEvent(MediaDownloadEvent):
"""
Event triggered when a media download is completed.
"""
class MediaDownloadErrorEvent(MediaDownloadEvent):
"""
Event triggered when a media download fails.
"""
def __init__(self, error: str, *args, **kwargs):
"""
:param error: Error message.
"""
super().__init__(*args, error=error, **kwargs)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,3 +1,4 @@
from contextlib import suppress
from dataclasses import dataclass from dataclasses import dataclass
import enum import enum
import functools import functools
@ -9,15 +10,34 @@ import re
import subprocess import subprocess
import tempfile import tempfile
import threading import threading
import time
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Iterable, Optional, List, Dict, Union from typing import (
Callable,
Dict,
IO,
Iterable,
List,
Optional,
Tuple,
Type,
Union,
)
import requests import requests
from platypush.config import Config from platypush.config import Config
from platypush.context import get_plugin, get_backend from platypush.context import get_plugin, get_backend
from platypush.plugins import Plugin, action from platypush.message.event.media import (
from platypush.utils import get_default_downloads_dir MediaDownloadCompletedEvent,
MediaDownloadErrorEvent,
MediaDownloadEvent,
MediaDownloadProgressEvent,
MediaDownloadStartedEvent,
MediaEvent,
)
from platypush.plugins import RunnablePlugin, action
from platypush.utils import get_default_downloads_dir, get_plugin_name_by_class
class PlayerState(enum.Enum): class PlayerState(enum.Enum):
@ -50,7 +70,7 @@ class MediaResource:
resolution: Optional[str] = None resolution: Optional[str] = None
class MediaPlugin(Plugin, ABC): class MediaPlugin(RunnablePlugin, ABC):
""" """
Generic plugin to interact with a media player. Generic plugin to interact with a media player.
@ -170,7 +190,7 @@ class MediaPlugin(Plugin, ABC):
env: Optional[Dict[str, str]] = None, env: Optional[Dict[str, str]] = None,
volume: Optional[Union[float, int]] = None, volume: Optional[Union[float, int]] = None,
torrent_plugin: str = 'torrent', torrent_plugin: str = 'torrent',
youtube_format: Optional[str] = 'best[height<=?1080][ext=mp4]', youtube_format: Optional[str] = 'bv[height<=?1080][ext=mp4]+ba',
youtube_dl: str = 'yt-dlp', youtube_dl: str = 'yt-dlp',
**kwargs, **kwargs,
): ):
@ -213,6 +233,7 @@ class MediaPlugin(Plugin, ABC):
media_dirs = [] media_dirs = []
player = None player = None
player_config = {} player_config = {}
self._download_threads: Dict[Tuple[str, str], threading.Thread] = {}
if self.__class__.__name__ == 'MediaPlugin': if self.__class__.__name__ == 'MediaPlugin':
# Abstract class, initialize with the default configured player # Abstract class, initialize with the default configured player
@ -399,8 +420,8 @@ class MediaPlugin(Plugin, ABC):
@action @action
@abstractmethod @abstractmethod
def stop(self, **kwargs): def stop(self, *args, **kwargs):
raise self._NOT_IMPLEMENTED_ERR super().stop()
@action @action
@abstractmethod @abstractmethod
@ -683,7 +704,15 @@ class MediaPlugin(Plugin, ABC):
output = ytdl.communicate()[0].decode().strip() output = ytdl.communicate()[0].decode().strip()
ytdl.wait() ytdl.wait()
stream_url, info = output.split('\n') self.logger.debug('yt-dlp output: %s', output)
lines = output.split('\n')
if not lines:
self.logger.warning('No output from yt-dlp')
return None
stream_url = lines[1] if len(lines) > 2 else lines[0]
info = lines[-1]
return { return {
**json.loads(info), **json.loads(info),
'url': stream_url, 'url': stream_url,
@ -772,31 +801,230 @@ class MediaPlugin(Plugin, ABC):
@action @action
def download( def download(
self, url: str, filename: Optional[str] = None, directory: Optional[str] = None self,
url: str,
filename: Optional[str] = None,
directory: Optional[str] = None,
timeout: int = 10,
youtube_format: Optional[str] = None,
): ):
""" """
Download a media URL to a local file on the Platypush host. Download a media URL to a local file on the Platypush host (yt-dlp
required for YouTube URLs).
This action is non-blocking and returns the path to the downloaded file
once the download is initiated.
You can then subscribe to these events to monitor the download progress:
- :class:`platypush.message.event.media.MediaDownloadStartedEvent`
- :class:`platypush.message.event.media.MediaDownloadProgressEvent`
- :class:`platypush.message.event.media.MediaDownloadErrorEvent`
- :class:`platypush.message.event.media.MediaDownloadFinishedEvent`
:param url: Media URL. :param url: Media URL.
:param filename: Media filename (default: inferred from the URL basename). :param filename: Media filename (default: inferred from the URL basename).
:param directory: Destination directory (default: ``download_dir``). :param directory: Destination directory (default: ``download_dir``).
:param timeout: Network timeout in seconds (default: 10).
:param youtube: Set to True if the URL is a YouTube video, or any other
URL compatible with yt-dlp.
:param youtube_format: Override the default YouTube format selection.
:return: The absolute path to the downloaded file. :return: The absolute path to the downloaded file.
""" """
path = self._get_download_path(
url, directory=directory, filename=filename, youtube_format=youtube_format
)
if not filename: if self._is_youtube_resource(url):
filename = url.split('/')[-1] self._download_youtube_url(
url, path, timeout=timeout, youtube_format=youtube_format
)
else:
self._download_url(url, path, timeout=timeout)
return path
def _get_download_path(
self,
url: str,
directory: Optional[str] = None,
filename: Optional[str] = None,
youtube_format: Optional[str] = None,
) -> str:
if not directory: if not directory:
directory = self.download_dir directory = self.download_dir
path = os.path.join(directory, filename) directory = os.path.expanduser(directory)
youtube_format = youtube_format or self.youtube_format
with requests.get(url, timeout=20, stream=True) as r: if self._is_youtube_resource(url):
with subprocess.Popen(
[
self._ytdl,
*(
[
'-f',
youtube_format,
]
if youtube_format
else []
),
'-O',
'%(title)s.%(ext)s',
url,
],
stdout=subprocess.PIPE,
) as proc:
assert proc.stdout, 'yt-dlp stdout is None'
filename = proc.stdout.read().decode()[:-1]
if not filename:
filename = url.split('/')[-1]
return os.path.join(directory, filename)
def _download_url(self, url: str, path: str, timeout: int):
r = requests.get(url, timeout=timeout, stream=True)
r.raise_for_status() r.raise_for_status()
with open(path, 'wb') as f: download_thread = threading.Thread(
for chunk in r.iter_content(chunk_size=8192): target=self._download_url_thread,
f.write(chunk) args=(r, open(path, 'wb')), # pylint: disable=consider-using-with
)
return path download_thread.start()
self._download_threads[url, path] = download_thread
def _download_youtube_url(
self, url: str, path: str, timeout: int, youtube_format: Optional[str] = None
):
ytdl_cmd = [
self._ytdl,
*(
['-f', youtube_format or self.youtube_format]
if youtube_format or self.youtube_format
else []
),
url,
'-o',
'-',
]
self.logger.info('Executing command %r', ytdl_cmd)
download_thread = threading.Thread(
target=self._download_youtube_url_thread,
args=(
subprocess.Popen( # pylint: disable=consider-using-with
ytdl_cmd, stdout=subprocess.PIPE
),
open(path, 'wb'), # pylint: disable=consider-using-with
url,
),
kwargs={'timeout': timeout},
)
download_thread.start()
self._download_threads[url, path] = download_thread
def _download_url_thread(self, response: requests.Response, f: IO):
def on_close():
with suppress(IOError, OSError, requests.exceptions.RequestException):
response.close()
size = int(response.headers.get('Content-Length', 0)) or None
self._download_thread_wrapper(
iterator=lambda: response.iter_content(chunk_size=8192),
f=f,
url=response.url,
size=size,
on_close=on_close,
)
def _download_youtube_url_thread(
self, proc: subprocess.Popen, f: IO, url: str, timeout: int
):
def read():
if not proc.stdout:
return b''
return proc.stdout.read(8192)
def on_close():
with suppress(IOError, OSError):
proc.terminate()
proc.wait(timeout=5)
if proc.returncode is None:
proc.kill()
proc_start = time.time()
while not proc.stdout:
if time.time() - proc_start > timeout:
self.logger.warning('yt-dlp process timed out')
on_close()
return
self.wait_stop(1)
self._download_thread_wrapper(
iterator=lambda: iter(read, b''),
f=f,
url=url,
size=None,
on_close=on_close,
)
def _download_thread_wrapper(
self,
iterator: Callable[[], Iterable[bytes]],
f: IO,
url: str,
size: Optional[int],
on_close: Callable[[], None] = lambda: None,
):
def post_event(event_type: Type[MediaDownloadEvent], **kwargs):
self._post_event(event_type, resource=url, path=f.name, **kwargs)
if (url, f.name) in self._download_threads:
self.logger.warning(
'A download of %s to %s is already in progress', url, f.name
)
return
interrupted = False
try:
self._post_event(MediaDownloadStartedEvent, resource=url, path=f.name)
last_percent = 0
for chunk in iterator():
if not chunk or self.should_stop():
interrupted = self.should_stop()
break
f.write(chunk)
percent = f.tell() / size * 100 if size else 0
if percent and percent - last_percent > 1:
post_event(MediaDownloadProgressEvent, progress=percent)
last_percent = percent
if not interrupted:
post_event(MediaDownloadCompletedEvent)
except Exception as e:
self.logger.warning('Error while downloading URL: %s', e)
post_event(MediaDownloadErrorEvent, error=str(e))
finally:
on_close()
with suppress(IOError, OSError):
f.close()
self._download_threads.pop((url, f.name), None)
def _post_event(self, event_type: Type[MediaEvent], **kwargs):
evt = event_type(
player=get_plugin_name_by_class(self.__class__), plugin=self, **kwargs
)
self._bus.post(evt)
def is_local(self): def is_local(self):
return self._is_local return self._is_local
@ -820,5 +1048,8 @@ class MediaPlugin(Plugin, ABC):
f.write(content) f.write(content)
return f.name return f.name
def main(self):
self.wait_stop()
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -23,3 +23,4 @@ extend-ignore =
W503 W503
SIM104 SIM104
SIM105 SIM105
SIM115