forked from platypush/platypush
[media] Support for generic media downloads.
This commit is contained in:
parent
dce6096020
commit
b43c4612fd
7 changed files with 683 additions and 233 deletions
|
@ -1,4 +1,5 @@
|
||||||
from abc import ABC
|
from abc import ABC
|
||||||
|
from typing import Optional
|
||||||
from platypush.message.event import Event
|
from platypush.message.event import Event
|
||||||
|
|
||||||
|
|
||||||
|
@ -133,24 +134,48 @@ class MediaDownloadEvent(MediaEvent, ABC):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self, *args, player=None, plugin=None, resource=None, target=None, **kwargs
|
self,
|
||||||
|
*args,
|
||||||
|
plugin: str,
|
||||||
|
resource: str,
|
||||||
|
state: str,
|
||||||
|
path: str,
|
||||||
|
player: Optional[str] = None,
|
||||||
|
size: Optional[int] = None,
|
||||||
|
timeout: Optional[int] = None,
|
||||||
|
progress: Optional[float] = None,
|
||||||
|
started_at: Optional[float] = None,
|
||||||
|
ended_at: Optional[float] = None,
|
||||||
|
**kwargs
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
:param resource: File name or URI of the downloaded resource
|
:param resource: File name or URI of the downloaded resource
|
||||||
:type resource: str
|
:param url: Alias for resource
|
||||||
:param target: Target file name or URI of the downloaded resource
|
:param path: Path where the resource is downloaded
|
||||||
:type target: str
|
:param state: Download state
|
||||||
|
:param size: Size of the downloaded resource in bytes
|
||||||
|
:param timeout: Download timeout in seconds
|
||||||
|
:param progress: Download progress in percentage, between 0 and 100
|
||||||
|
:param started_at: Download start time
|
||||||
|
:param ended_at: Download end time
|
||||||
"""
|
"""
|
||||||
|
|
||||||
super().__init__(
|
kwargs.update(
|
||||||
*args,
|
{
|
||||||
player=player,
|
"resource": resource,
|
||||||
plugin=plugin,
|
"path": path,
|
||||||
resource=resource,
|
"url": resource,
|
||||||
target=target,
|
"state": state,
|
||||||
**kwargs
|
"size": size,
|
||||||
|
"timeout": timeout,
|
||||||
|
"progress": progress,
|
||||||
|
"started_at": started_at,
|
||||||
|
"ended_at": ended_at,
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
super().__init__(*args, player=player, plugin=plugin, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class MediaDownloadStartedEvent(MediaDownloadEvent):
|
class MediaDownloadStartedEvent(MediaDownloadEvent):
|
||||||
"""
|
"""
|
||||||
|
@ -163,12 +188,6 @@ class MediaDownloadProgressEvent(MediaDownloadEvent):
|
||||||
Event triggered when a media download is in progress.
|
Event triggered when a media download is in progress.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, progress: float, *args, **kwargs):
|
|
||||||
"""
|
|
||||||
:param progress: Download progress in percentage, between 0 and 100.
|
|
||||||
"""
|
|
||||||
super().__init__(*args, progress=progress, **kwargs)
|
|
||||||
|
|
||||||
|
|
||||||
class MediaDownloadCompletedEvent(MediaDownloadEvent):
|
class MediaDownloadCompletedEvent(MediaDownloadEvent):
|
||||||
"""
|
"""
|
||||||
|
@ -188,4 +207,28 @@ class MediaDownloadErrorEvent(MediaDownloadEvent):
|
||||||
super().__init__(*args, error=error, **kwargs)
|
super().__init__(*args, error=error, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class MediaDownloadPausedEvent(MediaDownloadEvent):
|
||||||
|
"""
|
||||||
|
Event triggered when a media download is paused.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class MediaDownloadResumedEvent(MediaDownloadEvent):
|
||||||
|
"""
|
||||||
|
Event triggered when a media download is resumed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class MediaDownloadCancelledEvent(MediaDownloadEvent):
|
||||||
|
"""
|
||||||
|
Event triggered when a media download is cancelled.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class MediaDownloadClearEvent(MediaDownloadEvent):
|
||||||
|
"""
|
||||||
|
Event triggered when a download is cleared from the queue.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
|
@ -1,6 +1,3 @@
|
||||||
from contextlib import suppress
|
|
||||||
from dataclasses import dataclass
|
|
||||||
import enum
|
|
||||||
import functools
|
import functools
|
||||||
import inspect
|
import inspect
|
||||||
import json
|
import json
|
||||||
|
@ -10,12 +7,9 @@ import re
|
||||||
import subprocess
|
import subprocess
|
||||||
import tempfile
|
import tempfile
|
||||||
import threading
|
import threading
|
||||||
import time
|
|
||||||
from abc import ABC, abstractmethod
|
from abc import ABC, abstractmethod
|
||||||
from typing import (
|
from typing import (
|
||||||
Callable,
|
|
||||||
Dict,
|
Dict,
|
||||||
IO,
|
|
||||||
Iterable,
|
Iterable,
|
||||||
List,
|
List,
|
||||||
Optional,
|
Optional,
|
||||||
|
@ -28,46 +22,18 @@ import requests
|
||||||
|
|
||||||
from platypush.config import Config
|
from platypush.config import Config
|
||||||
from platypush.context import get_plugin, get_backend
|
from platypush.context import get_plugin, get_backend
|
||||||
from platypush.message.event.media import (
|
from platypush.message.event.media import MediaEvent
|
||||||
MediaDownloadCompletedEvent,
|
|
||||||
MediaDownloadErrorEvent,
|
|
||||||
MediaDownloadEvent,
|
|
||||||
MediaDownloadProgressEvent,
|
|
||||||
MediaDownloadStartedEvent,
|
|
||||||
MediaEvent,
|
|
||||||
)
|
|
||||||
from platypush.plugins import RunnablePlugin, action
|
from platypush.plugins import RunnablePlugin, action
|
||||||
from platypush.utils import get_default_downloads_dir, get_plugin_name_by_class
|
from platypush.utils import get_default_downloads_dir, get_plugin_name_by_class
|
||||||
|
|
||||||
|
from ._download import (
|
||||||
class PlayerState(enum.Enum):
|
DownloadState,
|
||||||
"""
|
DownloadThread,
|
||||||
Models the possible states of a media player
|
FileDownloadThread,
|
||||||
"""
|
YouTubeDownloadThread,
|
||||||
|
)
|
||||||
STOP = 'stop'
|
from ._resource import MediaResource
|
||||||
PLAY = 'play'
|
from ._state import PlayerState
|
||||||
PAUSE = 'pause'
|
|
||||||
IDLE = 'idle'
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class MediaResource:
|
|
||||||
"""
|
|
||||||
Models a media resource
|
|
||||||
"""
|
|
||||||
|
|
||||||
resource: str
|
|
||||||
url: str
|
|
||||||
title: Optional[str] = None
|
|
||||||
description: Optional[str] = None
|
|
||||||
filename: Optional[str] = None
|
|
||||||
image: Optional[str] = None
|
|
||||||
duration: Optional[float] = None
|
|
||||||
channel: Optional[str] = None
|
|
||||||
channel_url: Optional[str] = None
|
|
||||||
type: Optional[str] = None
|
|
||||||
resolution: Optional[str] = None
|
|
||||||
|
|
||||||
|
|
||||||
class MediaPlugin(RunnablePlugin, ABC):
|
class MediaPlugin(RunnablePlugin, ABC):
|
||||||
|
@ -233,7 +199,7 @@ class MediaPlugin(RunnablePlugin, ABC):
|
||||||
media_dirs = []
|
media_dirs = []
|
||||||
player = None
|
player = None
|
||||||
player_config = {}
|
player_config = {}
|
||||||
self._download_threads: Dict[Tuple[str, str], threading.Thread] = {}
|
self._download_threads: Dict[Tuple[str, str], DownloadThread] = {}
|
||||||
|
|
||||||
if self.__class__.__name__ == 'MediaPlugin':
|
if self.__class__.__name__ == 'MediaPlugin':
|
||||||
# Abstract class, initialize with the default configured player
|
# Abstract class, initialize with the default configured player
|
||||||
|
@ -357,6 +323,7 @@ class MediaPlugin(RunnablePlugin, ABC):
|
||||||
)
|
)
|
||||||
elif self._is_youtube_resource(resource):
|
elif self._is_youtube_resource(resource):
|
||||||
info = self._get_youtube_info(resource)
|
info = self._get_youtube_info(resource)
|
||||||
|
if info:
|
||||||
url = info.get('url')
|
url = info.get('url')
|
||||||
if url:
|
if url:
|
||||||
resource = url
|
resource = url
|
||||||
|
@ -420,7 +387,7 @@ class MediaPlugin(RunnablePlugin, ABC):
|
||||||
|
|
||||||
@action
|
@action
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def stop(self, *args, **kwargs):
|
def stop(self, *args, **kwargs): # type: ignore
|
||||||
super().stop()
|
super().stop()
|
||||||
|
|
||||||
@action
|
@action
|
||||||
|
@ -737,15 +704,6 @@ class MediaPlugin(RunnablePlugin, ABC):
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@action
|
|
||||||
def get_youtube_url(self, url, youtube_format: Optional[str] = None):
|
|
||||||
youtube_id = self.get_youtube_id(url)
|
|
||||||
if youtube_id:
|
|
||||||
url = f'https://www.youtube.com/watch?v={youtube_id}'
|
|
||||||
return self._get_youtube_info(url, youtube_format=youtube_format).get('url')
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def get_youtube_info(self, url):
|
def get_youtube_info(self, url):
|
||||||
# Legacy conversion for Mopidy YouTube URIs
|
# Legacy conversion for Mopidy YouTube URIs
|
||||||
|
@ -806,6 +764,7 @@ class MediaPlugin(RunnablePlugin, ABC):
|
||||||
filename: Optional[str] = None,
|
filename: Optional[str] = None,
|
||||||
directory: Optional[str] = None,
|
directory: Optional[str] = None,
|
||||||
timeout: int = 10,
|
timeout: int = 10,
|
||||||
|
sync: bool = False,
|
||||||
youtube_format: Optional[str] = None,
|
youtube_format: Optional[str] = None,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
|
@ -820,14 +779,17 @@ class MediaPlugin(RunnablePlugin, ABC):
|
||||||
- :class:`platypush.message.event.media.MediaDownloadStartedEvent`
|
- :class:`platypush.message.event.media.MediaDownloadStartedEvent`
|
||||||
- :class:`platypush.message.event.media.MediaDownloadProgressEvent`
|
- :class:`platypush.message.event.media.MediaDownloadProgressEvent`
|
||||||
- :class:`platypush.message.event.media.MediaDownloadErrorEvent`
|
- :class:`platypush.message.event.media.MediaDownloadErrorEvent`
|
||||||
- :class:`platypush.message.event.media.MediaDownloadFinishedEvent`
|
- :class:`platypush.message.event.media.MediaDownloadPausedEvent`
|
||||||
|
- :class:`platypush.message.event.media.MediaDownloadResumedEvent`
|
||||||
|
- :class:`platypush.message.event.media.MediaDownloadCancelledEvent`
|
||||||
|
- :class:`platypush.message.event.media.MediaDownloadCompletedEvent`
|
||||||
|
|
||||||
:param url: Media URL.
|
:param url: Media URL.
|
||||||
:param filename: Media filename (default: inferred from the URL basename).
|
:param filename: Media filename (default: inferred from the URL basename).
|
||||||
:param directory: Destination directory (default: ``download_dir``).
|
:param directory: Destination directory (default: ``download_dir``).
|
||||||
:param timeout: Network timeout in seconds (default: 10).
|
:param timeout: Network timeout in seconds (default: 10).
|
||||||
:param youtube: Set to True if the URL is a YouTube video, or any other
|
:param sync: If set to True, the download will be synchronous and the
|
||||||
URL compatible with yt-dlp.
|
action will return only when the download is completed.
|
||||||
:param youtube_format: Override the default YouTube format selection.
|
:param youtube_format: Override the default YouTube format selection.
|
||||||
:return: The absolute path to the downloaded file.
|
:return: The absolute path to the downloaded file.
|
||||||
"""
|
"""
|
||||||
|
@ -836,14 +798,124 @@ class MediaPlugin(RunnablePlugin, ABC):
|
||||||
)
|
)
|
||||||
|
|
||||||
if self._is_youtube_resource(url):
|
if self._is_youtube_resource(url):
|
||||||
self._download_youtube_url(
|
dl_thread = self._download_youtube_url(
|
||||||
url, path, timeout=timeout, youtube_format=youtube_format
|
url, path, youtube_format=youtube_format
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
self._download_url(url, path, timeout=timeout)
|
dl_thread = self._download_url(url, path, timeout=timeout)
|
||||||
|
|
||||||
|
if sync:
|
||||||
|
dl_thread.join()
|
||||||
|
|
||||||
return path
|
return path
|
||||||
|
|
||||||
|
@action
|
||||||
|
def pause_download(self, url: Optional[str] = None, path: Optional[str] = None):
|
||||||
|
"""
|
||||||
|
Pause a download in progress.
|
||||||
|
|
||||||
|
Either the URL or the path must be specified.
|
||||||
|
|
||||||
|
:param url: URL of the download.
|
||||||
|
:param path: Path of the download (default: any path associated with the URL).
|
||||||
|
"""
|
||||||
|
for thread in self._get_downloads(url=url, path=path):
|
||||||
|
thread.pause()
|
||||||
|
|
||||||
|
@action
|
||||||
|
def resume_download(self, url: Optional[str] = None, path: Optional[str] = None):
|
||||||
|
"""
|
||||||
|
Resume a paused download.
|
||||||
|
|
||||||
|
Either the URL or the path must be specified.
|
||||||
|
|
||||||
|
:param url: URL of the download.
|
||||||
|
:param path: Path of the download (default: any path associated with the URL).
|
||||||
|
"""
|
||||||
|
for thread in self._get_downloads(url=url, path=path):
|
||||||
|
thread.resume()
|
||||||
|
|
||||||
|
@action
|
||||||
|
def cancel_download(self, url: Optional[str] = None, path: Optional[str] = None):
|
||||||
|
"""
|
||||||
|
Cancel a download in progress.
|
||||||
|
|
||||||
|
Either the URL or the path must be specified.
|
||||||
|
|
||||||
|
:param url: URL of the download.
|
||||||
|
:param path: Path of the download (default: any path associated with the URL).
|
||||||
|
"""
|
||||||
|
for thread in self._get_downloads(url=url, path=path):
|
||||||
|
thread.stop()
|
||||||
|
|
||||||
|
@action
|
||||||
|
def clear_downloads(self, url: Optional[str] = None, path: Optional[str] = None):
|
||||||
|
"""
|
||||||
|
Clear completed/cancelled downloads from the queue.
|
||||||
|
|
||||||
|
:param url: URL of the download (default: all downloads).
|
||||||
|
:param path: Path of the download (default: any path associated with the URL).
|
||||||
|
"""
|
||||||
|
threads = (
|
||||||
|
self._get_downloads(url=url, path=path)
|
||||||
|
if url
|
||||||
|
else list(self._download_threads.values())
|
||||||
|
)
|
||||||
|
|
||||||
|
for thread in threads:
|
||||||
|
if thread.state not in (DownloadState.COMPLETED, DownloadState.CANCELLED):
|
||||||
|
continue
|
||||||
|
|
||||||
|
dl = self._download_threads.pop((thread.url, thread.path), None)
|
||||||
|
if dl:
|
||||||
|
dl.clear()
|
||||||
|
|
||||||
|
@action
|
||||||
|
def get_downloads(self, url: Optional[str] = None, path: Optional[str] = None):
|
||||||
|
"""
|
||||||
|
Get the download threads.
|
||||||
|
|
||||||
|
:param url: URL of the download (default: all downloads).
|
||||||
|
:param path: Path of the download (default: any path associated with the URL).
|
||||||
|
:return: .. schema:: media.download.MediaDownloadSchema(many=True)
|
||||||
|
"""
|
||||||
|
from platypush.schemas.media.download import MediaDownloadSchema
|
||||||
|
|
||||||
|
return MediaDownloadSchema().dump(
|
||||||
|
(
|
||||||
|
self._get_downloads(url=url, path=path)
|
||||||
|
if url
|
||||||
|
else list(self._download_threads.values())
|
||||||
|
),
|
||||||
|
many=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _get_downloads(self, url: Optional[str] = None, path: Optional[str] = None):
|
||||||
|
assert url or path, 'URL or path must be specified'
|
||||||
|
threads = []
|
||||||
|
|
||||||
|
if url and path:
|
||||||
|
path = os.path.expanduser(path)
|
||||||
|
thread = self._download_threads.get((url, path))
|
||||||
|
if thread:
|
||||||
|
threads = [thread]
|
||||||
|
elif url:
|
||||||
|
threads = [
|
||||||
|
thread
|
||||||
|
for (url_, _), thread in self._download_threads.items()
|
||||||
|
if url_ == url
|
||||||
|
]
|
||||||
|
elif path:
|
||||||
|
path = os.path.expanduser(path)
|
||||||
|
threads = [
|
||||||
|
thread
|
||||||
|
for (_, path_), thread in self._download_threads.items()
|
||||||
|
if path_ == path
|
||||||
|
]
|
||||||
|
|
||||||
|
assert threads, f'No matching downloads found for [url={url}, path={path}]'
|
||||||
|
return threads
|
||||||
|
|
||||||
def _get_download_path(
|
def _get_download_path(
|
||||||
self,
|
self,
|
||||||
url: str,
|
url: str,
|
||||||
|
@ -883,142 +955,46 @@ class MediaPlugin(RunnablePlugin, ABC):
|
||||||
|
|
||||||
return os.path.join(directory, filename)
|
return os.path.join(directory, filename)
|
||||||
|
|
||||||
def _download_url(self, url: str, path: str, timeout: int):
|
def _download_url(self, url: str, path: str, timeout: int) -> FileDownloadThread:
|
||||||
r = requests.get(url, timeout=timeout, stream=True)
|
download_thread = FileDownloadThread(
|
||||||
r.raise_for_status()
|
url=url,
|
||||||
download_thread = threading.Thread(
|
path=path,
|
||||||
target=self._download_url_thread,
|
timeout=timeout,
|
||||||
args=(r, open(path, 'wb')), # pylint: disable=consider-using-with
|
on_start=self._on_download_start,
|
||||||
|
post_event=self._post_event,
|
||||||
|
stop_event=self._should_stop,
|
||||||
)
|
)
|
||||||
|
|
||||||
download_thread.start()
|
self._start_download(download_thread)
|
||||||
self._download_threads[url, path] = download_thread
|
return download_thread
|
||||||
|
|
||||||
def _download_youtube_url(
|
def _download_youtube_url(
|
||||||
self, url: str, path: str, timeout: int, youtube_format: Optional[str] = None
|
self, url: str, path: str, youtube_format: Optional[str] = None
|
||||||
):
|
) -> YouTubeDownloadThread:
|
||||||
ytdl_cmd = [
|
download_thread = YouTubeDownloadThread(
|
||||||
self._ytdl,
|
|
||||||
*(
|
|
||||||
['-f', youtube_format or self.youtube_format]
|
|
||||||
if youtube_format or self.youtube_format
|
|
||||||
else []
|
|
||||||
),
|
|
||||||
url,
|
|
||||||
'-o',
|
|
||||||
'-',
|
|
||||||
]
|
|
||||||
|
|
||||||
self.logger.info('Executing command %r', ytdl_cmd)
|
|
||||||
download_thread = threading.Thread(
|
|
||||||
target=self._download_youtube_url_thread,
|
|
||||||
args=(
|
|
||||||
subprocess.Popen( # pylint: disable=consider-using-with
|
|
||||||
ytdl_cmd, stdout=subprocess.PIPE
|
|
||||||
),
|
|
||||||
open(path, 'wb'), # pylint: disable=consider-using-with
|
|
||||||
url,
|
|
||||||
),
|
|
||||||
kwargs={'timeout': timeout},
|
|
||||||
)
|
|
||||||
|
|
||||||
download_thread.start()
|
|
||||||
self._download_threads[url, path] = download_thread
|
|
||||||
|
|
||||||
def _download_url_thread(self, response: requests.Response, f: IO):
|
|
||||||
def on_close():
|
|
||||||
with suppress(IOError, OSError, requests.exceptions.RequestException):
|
|
||||||
response.close()
|
|
||||||
|
|
||||||
size = int(response.headers.get('Content-Length', 0)) or None
|
|
||||||
self._download_thread_wrapper(
|
|
||||||
iterator=lambda: response.iter_content(chunk_size=8192),
|
|
||||||
f=f,
|
|
||||||
url=response.url,
|
|
||||||
size=size,
|
|
||||||
on_close=on_close,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _download_youtube_url_thread(
|
|
||||||
self, proc: subprocess.Popen, f: IO, url: str, timeout: int
|
|
||||||
):
|
|
||||||
def read():
|
|
||||||
if not proc.stdout:
|
|
||||||
return b''
|
|
||||||
|
|
||||||
return proc.stdout.read(8192)
|
|
||||||
|
|
||||||
def on_close():
|
|
||||||
with suppress(IOError, OSError):
|
|
||||||
proc.terminate()
|
|
||||||
proc.wait(timeout=5)
|
|
||||||
if proc.returncode is None:
|
|
||||||
proc.kill()
|
|
||||||
|
|
||||||
proc_start = time.time()
|
|
||||||
|
|
||||||
while not proc.stdout:
|
|
||||||
if time.time() - proc_start > timeout:
|
|
||||||
self.logger.warning('yt-dlp process timed out')
|
|
||||||
on_close()
|
|
||||||
return
|
|
||||||
|
|
||||||
self.wait_stop(1)
|
|
||||||
|
|
||||||
self._download_thread_wrapper(
|
|
||||||
iterator=lambda: iter(read, b''),
|
|
||||||
f=f,
|
|
||||||
url=url,
|
url=url,
|
||||||
size=None,
|
path=path,
|
||||||
on_close=on_close,
|
ytdl=self._ytdl,
|
||||||
|
youtube_format=youtube_format or self.youtube_format,
|
||||||
|
on_start=self._on_download_start,
|
||||||
|
post_event=self._post_event,
|
||||||
|
stop_event=self._should_stop,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _download_thread_wrapper(
|
self._start_download(download_thread)
|
||||||
self,
|
return download_thread
|
||||||
iterator: Callable[[], Iterable[bytes]],
|
|
||||||
f: IO,
|
|
||||||
url: str,
|
|
||||||
size: Optional[int],
|
|
||||||
on_close: Callable[[], None] = lambda: None,
|
|
||||||
):
|
|
||||||
def post_event(event_type: Type[MediaDownloadEvent], **kwargs):
|
|
||||||
self._post_event(event_type, resource=url, path=f.name, **kwargs)
|
|
||||||
|
|
||||||
if (url, f.name) in self._download_threads:
|
def _on_download_start(self, thread: DownloadThread):
|
||||||
|
self._download_threads[thread.url, thread.path] = thread
|
||||||
|
|
||||||
|
def _start_download(self, thread: DownloadThread):
|
||||||
|
if (thread.url, thread.path) in self._download_threads:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'A download of %s to %s is already in progress', url, f.name
|
'A download of %s to %s is already in progress', thread.url, thread.path
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
interrupted = False
|
thread.start()
|
||||||
|
|
||||||
try:
|
|
||||||
self._post_event(MediaDownloadStartedEvent, resource=url, path=f.name)
|
|
||||||
last_percent = 0
|
|
||||||
|
|
||||||
for chunk in iterator():
|
|
||||||
if not chunk or self.should_stop():
|
|
||||||
interrupted = self.should_stop()
|
|
||||||
break
|
|
||||||
|
|
||||||
f.write(chunk)
|
|
||||||
percent = f.tell() / size * 100 if size else 0
|
|
||||||
if percent and percent - last_percent > 1:
|
|
||||||
post_event(MediaDownloadProgressEvent, progress=percent)
|
|
||||||
last_percent = percent
|
|
||||||
|
|
||||||
if not interrupted:
|
|
||||||
post_event(MediaDownloadCompletedEvent)
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.warning('Error while downloading URL: %s', e)
|
|
||||||
post_event(MediaDownloadErrorEvent, error=str(e))
|
|
||||||
finally:
|
|
||||||
on_close()
|
|
||||||
|
|
||||||
with suppress(IOError, OSError):
|
|
||||||
f.close()
|
|
||||||
|
|
||||||
self._download_threads.pop((url, f.name), None)
|
|
||||||
|
|
||||||
def _post_event(self, event_type: Type[MediaEvent], **kwargs):
|
def _post_event(self, event_type: Type[MediaEvent], **kwargs):
|
||||||
evt = event_type(
|
evt = event_type(
|
||||||
|
@ -1052,4 +1028,11 @@ class MediaPlugin(RunnablePlugin, ABC):
|
||||||
self.wait_stop()
|
self.wait_stop()
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
'DownloadState',
|
||||||
|
'MediaPlugin',
|
||||||
|
'PlayerState',
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
341
platypush/plugins/media/_download.py
Normal file
341
platypush/plugins/media/_download.py
Normal file
|
@ -0,0 +1,341 @@
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from contextlib import suppress
|
||||||
|
from enum import Enum
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import signal
|
||||||
|
import subprocess
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from typing import Any, Callable, Optional, Type
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from platypush.message.event.media import (
|
||||||
|
MediaDownloadCancelledEvent,
|
||||||
|
MediaDownloadClearEvent,
|
||||||
|
MediaDownloadCompletedEvent,
|
||||||
|
MediaDownloadErrorEvent,
|
||||||
|
MediaDownloadEvent,
|
||||||
|
MediaDownloadPausedEvent,
|
||||||
|
MediaDownloadProgressEvent,
|
||||||
|
MediaDownloadResumedEvent,
|
||||||
|
MediaDownloadStartedEvent,
|
||||||
|
)
|
||||||
|
|
||||||
|
from platypush.utils import wait_for_either
|
||||||
|
|
||||||
|
|
||||||
|
class DownloadState(Enum):
|
||||||
|
"""
|
||||||
|
Enum that represents the status of a download.
|
||||||
|
"""
|
||||||
|
|
||||||
|
IDLE = 'idle'
|
||||||
|
STARTED = 'started'
|
||||||
|
DOWNLOADING = 'downloading'
|
||||||
|
PAUSED = 'paused'
|
||||||
|
COMPLETED = 'completed'
|
||||||
|
CANCELLED = 'cancelled'
|
||||||
|
ERROR = 'error'
|
||||||
|
|
||||||
|
|
||||||
|
class DownloadThread(threading.Thread, ABC):
|
||||||
|
"""
|
||||||
|
Thread that downloads a URL to a file.
|
||||||
|
"""
|
||||||
|
|
||||||
|
_progress_update_interval = 1
|
||||||
|
""" Throttle the progress updates to this interval, in seconds. """
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
path: str,
|
||||||
|
url: str,
|
||||||
|
post_event: Callable,
|
||||||
|
size: Optional[int] = None,
|
||||||
|
timeout: Optional[int] = 10,
|
||||||
|
on_start: Callable[['DownloadThread'], None] = lambda _: None,
|
||||||
|
on_close: Callable[['DownloadThread'], None] = lambda _: None,
|
||||||
|
stop_event: Optional[threading.Event] = None,
|
||||||
|
):
|
||||||
|
super().__init__(name=f'DownloadThread-{path}')
|
||||||
|
self.path = path
|
||||||
|
self.url = url
|
||||||
|
self.size = size
|
||||||
|
self.timeout = timeout
|
||||||
|
self.state = DownloadState.IDLE
|
||||||
|
self.progress = None
|
||||||
|
self.started_at = None
|
||||||
|
self.ended_at = None
|
||||||
|
self._upstream_stop_event = stop_event or threading.Event()
|
||||||
|
self._stop_event = threading.Event()
|
||||||
|
self._post_event = post_event
|
||||||
|
self._on_start = on_start
|
||||||
|
self._on_close = on_close
|
||||||
|
self._paused = threading.Event()
|
||||||
|
self._downloading = threading.Event()
|
||||||
|
self._last_progress_update_time = 0
|
||||||
|
self.logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
def should_stop(self) -> bool:
|
||||||
|
return self._stop_event.is_set() or self._upstream_stop_event.is_set()
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _run(self) -> bool:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def pause(self):
|
||||||
|
self.state = DownloadState.PAUSED
|
||||||
|
self._paused.set()
|
||||||
|
self._downloading.clear()
|
||||||
|
self.post_event(MediaDownloadPausedEvent)
|
||||||
|
|
||||||
|
def resume(self):
|
||||||
|
self.state = DownloadState.DOWNLOADING
|
||||||
|
self._paused.clear()
|
||||||
|
self._downloading.set()
|
||||||
|
self.post_event(MediaDownloadResumedEvent)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
super().run()
|
||||||
|
interrupted = False
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.on_start()
|
||||||
|
interrupted = not self._run()
|
||||||
|
|
||||||
|
if interrupted:
|
||||||
|
self.state = DownloadState.CANCELLED
|
||||||
|
else:
|
||||||
|
self.state = DownloadState.COMPLETED
|
||||||
|
except Exception as e:
|
||||||
|
self.state = DownloadState.ERROR
|
||||||
|
self.post_event(MediaDownloadErrorEvent, error=str(e))
|
||||||
|
self.logger.warning('Error while downloading URL: %s', e)
|
||||||
|
finally:
|
||||||
|
self.on_close()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.state = DownloadState.CANCELLED
|
||||||
|
self._stop_event.set()
|
||||||
|
self._downloading.clear()
|
||||||
|
|
||||||
|
def on_start(self):
|
||||||
|
self.state = DownloadState.STARTED
|
||||||
|
self.started_at = time.time()
|
||||||
|
self.post_event(MediaDownloadStartedEvent)
|
||||||
|
self._on_start(self)
|
||||||
|
|
||||||
|
def on_close(self):
|
||||||
|
self.ended_at = time.time()
|
||||||
|
if self.state == DownloadState.CANCELLED:
|
||||||
|
self.post_event(MediaDownloadCancelledEvent)
|
||||||
|
elif self.state == DownloadState.COMPLETED:
|
||||||
|
self.post_event(MediaDownloadCompletedEvent)
|
||||||
|
|
||||||
|
self._on_close(self)
|
||||||
|
|
||||||
|
def clear(self):
|
||||||
|
if self.state not in (DownloadState.COMPLETED, DownloadState.CANCELLED):
|
||||||
|
self.logger.info(
|
||||||
|
'Download thread for %s is still active, stopping', self.url
|
||||||
|
)
|
||||||
|
|
||||||
|
self.stop()
|
||||||
|
self.join(timeout=10)
|
||||||
|
|
||||||
|
self.post_event(MediaDownloadClearEvent)
|
||||||
|
|
||||||
|
def post_event(self, event_type: Type[MediaDownloadEvent], **kwargs):
|
||||||
|
kwargs = {
|
||||||
|
'resource': self.url,
|
||||||
|
'path': self.path,
|
||||||
|
'state': self.state.value,
|
||||||
|
'size': self.size,
|
||||||
|
'timeout': self.timeout,
|
||||||
|
'progress': self.progress,
|
||||||
|
'started_at': self.started_at,
|
||||||
|
'ended_at': self.ended_at,
|
||||||
|
**kwargs,
|
||||||
|
}
|
||||||
|
|
||||||
|
self._post_event(event_type, **kwargs)
|
||||||
|
|
||||||
|
def __setattr__(self, name: str, value: Optional[Any], /) -> None:
|
||||||
|
if name == 'progress' and value is not None:
|
||||||
|
if value < 0 or value > 100:
|
||||||
|
self.logger.debug('Invalid progress value:%s', value)
|
||||||
|
return
|
||||||
|
|
||||||
|
prev_progress = getattr(self, 'progress', None)
|
||||||
|
|
||||||
|
if prev_progress is None or (
|
||||||
|
int(prev_progress) != int(value)
|
||||||
|
and (
|
||||||
|
time.time() - self._last_progress_update_time
|
||||||
|
>= self._progress_update_interval
|
||||||
|
)
|
||||||
|
):
|
||||||
|
value = round(value, 2)
|
||||||
|
self._last_progress_update_time = time.time()
|
||||||
|
self.post_event(MediaDownloadProgressEvent, progress=value)
|
||||||
|
|
||||||
|
super().__setattr__(name, value)
|
||||||
|
|
||||||
|
|
||||||
|
class FileDownloadThread(DownloadThread):
|
||||||
|
"""
|
||||||
|
Thread that downloads a generic URL to a file.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _run(self):
|
||||||
|
interrupted = False
|
||||||
|
|
||||||
|
with requests.get(self.url, timeout=self.timeout, stream=True) as response:
|
||||||
|
response.raise_for_status()
|
||||||
|
self.size = int(response.headers.get('Content-Length', 0)) or None
|
||||||
|
|
||||||
|
with open(self.path, 'wb') as f:
|
||||||
|
self.on_start()
|
||||||
|
|
||||||
|
for chunk in response.iter_content(chunk_size=8192):
|
||||||
|
if not chunk or self.should_stop():
|
||||||
|
interrupted = self.should_stop()
|
||||||
|
if interrupted:
|
||||||
|
self.stop()
|
||||||
|
|
||||||
|
break
|
||||||
|
|
||||||
|
self.state = DownloadState.DOWNLOADING
|
||||||
|
f.write(chunk)
|
||||||
|
percent = f.tell() / self.size * 100 if self.size else 0
|
||||||
|
self.progress = percent
|
||||||
|
|
||||||
|
if self._paused.is_set():
|
||||||
|
wait_for_either(self._downloading, self._stop_event)
|
||||||
|
|
||||||
|
return not interrupted
|
||||||
|
|
||||||
|
|
||||||
|
class YouTubeDownloadThread(DownloadThread):
|
||||||
|
"""
|
||||||
|
Thread that downloads a YouTube URL to a file.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self, *args, ytdl: str, youtube_format: Optional[str] = None, **kwargs
|
||||||
|
):
|
||||||
|
super().__init__(*args, **kwargs)
|
||||||
|
self._ytdl = ytdl
|
||||||
|
self._youtube_format = youtube_format
|
||||||
|
self._proc = None
|
||||||
|
self._proc_lock = threading.Lock()
|
||||||
|
|
||||||
|
def _parse_progress(self, line: str):
|
||||||
|
try:
|
||||||
|
progress = json.loads(line)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return
|
||||||
|
|
||||||
|
status = progress.get('status')
|
||||||
|
if not status:
|
||||||
|
return
|
||||||
|
|
||||||
|
if status == 'finished':
|
||||||
|
self.progress = 100
|
||||||
|
return
|
||||||
|
|
||||||
|
if status == 'paused':
|
||||||
|
self.state = DownloadState.PAUSED
|
||||||
|
elif status == 'downloading':
|
||||||
|
self.state = DownloadState.DOWNLOADING
|
||||||
|
|
||||||
|
self.size = int(progress.get('total_bytes_estimate', 0)) or self.size
|
||||||
|
if self.size:
|
||||||
|
downloaded = int(progress.get('downloaded_bytes', 0))
|
||||||
|
self.progress = (downloaded / self.size) * 100
|
||||||
|
|
||||||
|
def _run(self):
|
||||||
|
ytdl_cmd = [
|
||||||
|
self._ytdl,
|
||||||
|
'--newline',
|
||||||
|
'--progress',
|
||||||
|
'--progress-delta',
|
||||||
|
str(self._progress_update_interval),
|
||||||
|
'--progress-template',
|
||||||
|
'%(progress)j',
|
||||||
|
*(['-f', self._youtube_format] if self._youtube_format else []),
|
||||||
|
self.url,
|
||||||
|
'-o',
|
||||||
|
self.path,
|
||||||
|
]
|
||||||
|
|
||||||
|
self.logger.info('Executing command %r', ytdl_cmd)
|
||||||
|
err = None
|
||||||
|
|
||||||
|
with subprocess.Popen(ytdl_cmd, stdout=subprocess.PIPE) as self._proc:
|
||||||
|
if self._proc.stdout:
|
||||||
|
for line in self._proc.stdout:
|
||||||
|
self.logger.debug(
|
||||||
|
'%s output: %s', self._ytdl, line.decode().strip()
|
||||||
|
)
|
||||||
|
|
||||||
|
self._parse_progress(line.decode())
|
||||||
|
|
||||||
|
if self.should_stop():
|
||||||
|
self.stop()
|
||||||
|
return self._proc.returncode == 0
|
||||||
|
|
||||||
|
if self._paused.is_set():
|
||||||
|
wait_for_either(self._downloading, self._stop_event)
|
||||||
|
|
||||||
|
if self._proc.returncode != 0:
|
||||||
|
err = self._proc.stderr.read().decode() if self._proc.stderr else None
|
||||||
|
raise RuntimeError(
|
||||||
|
f'{self._ytdl} failed with return code {self._proc.returncode}: {err}'
|
||||||
|
)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def pause(self):
|
||||||
|
with self._proc_lock:
|
||||||
|
if self._proc:
|
||||||
|
self._proc.send_signal(signal.SIGSTOP)
|
||||||
|
|
||||||
|
super().pause()
|
||||||
|
|
||||||
|
def resume(self):
|
||||||
|
with self._proc_lock:
|
||||||
|
if self._proc:
|
||||||
|
self._proc.send_signal(signal.SIGCONT)
|
||||||
|
|
||||||
|
super().resume()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
state = None
|
||||||
|
|
||||||
|
with suppress(IOError, OSError), self._proc_lock:
|
||||||
|
if self._proc:
|
||||||
|
if self._proc.poll() is None:
|
||||||
|
self._proc.terminate()
|
||||||
|
self._proc.wait(timeout=3)
|
||||||
|
if self._proc.returncode is None:
|
||||||
|
self._proc.kill()
|
||||||
|
|
||||||
|
state = DownloadState.CANCELLED
|
||||||
|
elif self._proc.returncode != 0:
|
||||||
|
state = DownloadState.ERROR
|
||||||
|
else:
|
||||||
|
state = DownloadState.COMPLETED
|
||||||
|
|
||||||
|
self._proc = None
|
||||||
|
|
||||||
|
super().stop()
|
||||||
|
|
||||||
|
if state:
|
||||||
|
self.state = state
|
||||||
|
|
||||||
|
def on_close(self):
|
||||||
|
self.stop()
|
||||||
|
super().on_close()
|
21
platypush/plugins/media/_resource.py
Normal file
21
platypush/plugins/media/_resource.py
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MediaResource:
|
||||||
|
"""
|
||||||
|
Models a media resource
|
||||||
|
"""
|
||||||
|
|
||||||
|
resource: str
|
||||||
|
url: str
|
||||||
|
title: Optional[str] = None
|
||||||
|
description: Optional[str] = None
|
||||||
|
filename: Optional[str] = None
|
||||||
|
image: Optional[str] = None
|
||||||
|
duration: Optional[float] = None
|
||||||
|
channel: Optional[str] = None
|
||||||
|
channel_url: Optional[str] = None
|
||||||
|
type: Optional[str] = None
|
||||||
|
resolution: Optional[str] = None
|
12
platypush/plugins/media/_state.py
Normal file
12
platypush/plugins/media/_state.py
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
import enum
|
||||||
|
|
||||||
|
|
||||||
|
class PlayerState(enum.Enum):
|
||||||
|
"""
|
||||||
|
Models the possible states of a media player
|
||||||
|
"""
|
||||||
|
|
||||||
|
STOP = 'stop'
|
||||||
|
PLAY = 'play'
|
||||||
|
PAUSE = 'pause'
|
||||||
|
IDLE = 'idle'
|
|
@ -179,21 +179,6 @@ class MediaKodiPlugin(MediaPlugin):
|
||||||
:param resource: URL or path to the media to be played
|
:param resource: URL or path to the media to be played
|
||||||
"""
|
"""
|
||||||
|
|
||||||
youtube_id = self.get_youtube_id(resource)
|
|
||||||
if youtube_id:
|
|
||||||
try:
|
|
||||||
resource = self.get_youtube_url(youtube_id).output
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.warning(
|
|
||||||
'youtube-dl error, falling back to Kodi YouTube plugin: {}'.format(
|
|
||||||
str(e)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
resource = (
|
|
||||||
'plugin://plugin.video.youtube/?action=play_video&videoid='
|
|
||||||
+ youtube_id
|
|
||||||
)
|
|
||||||
|
|
||||||
if resource.startswith('file://'):
|
if resource.startswith('file://'):
|
||||||
resource = resource[7:]
|
resource = resource[7:]
|
||||||
|
|
||||||
|
@ -585,7 +570,7 @@ class MediaKodiPlugin(MediaPlugin):
|
||||||
:type position: float
|
:type position: float
|
||||||
:param player_id: ID of the target player (default: configured/current player).
|
:param player_id: ID of the target player (default: configured/current player).
|
||||||
"""
|
"""
|
||||||
return self.seek(position=position, player_id=player_id, *args, **kwargs)
|
return self.seek(*args, position=position, player_id=player_id, **kwargs)
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def back(self, offset=30, player_id=None, *args, **kwargs):
|
def back(self, offset=30, player_id=None, *args, **kwargs):
|
||||||
|
|
65
platypush/schemas/media/download.py
Normal file
65
platypush/schemas/media/download.py
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
from marshmallow import fields
|
||||||
|
from marshmallow.schema import Schema
|
||||||
|
|
||||||
|
from platypush.plugins.media import DownloadState
|
||||||
|
from platypush.schemas import DateTime
|
||||||
|
|
||||||
|
|
||||||
|
class MediaDownloadSchema(Schema):
|
||||||
|
"""
|
||||||
|
Media download schema.
|
||||||
|
"""
|
||||||
|
|
||||||
|
url = fields.URL(
|
||||||
|
required=True,
|
||||||
|
metadata={
|
||||||
|
"description": "Download URL",
|
||||||
|
"example": "https://example.com/video.mp4",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
path = fields.String(
|
||||||
|
required=True,
|
||||||
|
metadata={
|
||||||
|
"description": "Download path",
|
||||||
|
"example": "/path/to/download/video.mp4",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
state = fields.Enum(
|
||||||
|
DownloadState,
|
||||||
|
required=True,
|
||||||
|
metadata={
|
||||||
|
"description": "Download state",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
size = fields.Integer(
|
||||||
|
nullable=True,
|
||||||
|
metadata={
|
||||||
|
"description": "Download size (bytes)",
|
||||||
|
"example": 1024,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
timeout = fields.Integer(
|
||||||
|
nullable=True,
|
||||||
|
metadata={
|
||||||
|
"description": "Download timeout (seconds)",
|
||||||
|
"example": 60,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
started_at = DateTime(
|
||||||
|
nullable=True,
|
||||||
|
metadata={
|
||||||
|
"description": "Download start time",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
ended_at = DateTime(
|
||||||
|
nullable=True,
|
||||||
|
metadata={
|
||||||
|
"description": "Download end time",
|
||||||
|
},
|
||||||
|
)
|
Loading…
Reference in a new issue