From 52d0ba442bf57f9ffee03690d4c76b9f5816b1e0 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 5 Feb 2019 00:15:36 +0100 Subject: [PATCH] Refactored webtorrent plugin and synchronization with the player The WebTorrent plugin now uses the `download -o -p ` options. Improved interaction both with the webtorrent executable and the player executable, and triggering the right events upon state changes, as well as free support for torrent streaming through the webtorrent executable. --- platypush/message/event/torrent.py | 9 ++ platypush/plugins/media/__init__.py | 7 + platypush/plugins/media/mplayer.py | 35 ++--- platypush/plugins/media/omxplayer.py | 29 +++- platypush/plugins/media/webtorrent.py | 185 +++++++++++++++++++++----- platypush/utils/__init__.py | 30 +++++ 6 files changed, 241 insertions(+), 54 deletions(-) diff --git a/platypush/message/event/torrent.py b/platypush/message/event/torrent.py index e3d3d38a39..43c4fea09a 100644 --- a/platypush/message/event/torrent.py +++ b/platypush/message/event/torrent.py @@ -10,6 +10,15 @@ class TorrentEvent(Event): super().__init__(*args, **kwargs) +class TorrentDownloadingMetadataEvent(TorrentEvent): + """ + Event triggered upon torrent metadata download start + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + class TorrentDownloadStartEvent(TorrentEvent): """ Event triggered upon torrent download start diff --git a/platypush/plugins/media/__init__.py b/platypush/plugins/media/__init__.py index 681aea406d..6d8ec6b5a0 100644 --- a/platypush/plugins/media/__init__.py +++ b/platypush/plugins/media/__init__.py @@ -27,6 +27,10 @@ class MediaPlugin(Plugin): * **youtube-dl** installed on your system (see your distro instructions), optional for YouTube support """ + # A media plugin can either be local or remote (e.g. control media on + # another device) + _is_local = True + _NOT_IMPLEMENTED_ERR = NotImplementedError( 'This method must be implemented in a derived class') @@ -360,5 +364,8 @@ class MediaPlugin(Plugin): return proc.stdout.read().decode("utf-8", "strict")[:-1] + def is_local(self): + return self._is_local + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/mplayer.py b/platypush/plugins/media/mplayer.py index c145c55744..4f480b2962 100644 --- a/platypush/plugins/media/mplayer.py +++ b/platypush/plugins/media/mplayer.py @@ -72,8 +72,9 @@ class MediaMplayerPlugin(MediaPlugin): self.args = args or [] self._init_mplayer_bin() self._build_actions() - self._mplayer = None + self._player = None self._mplayer_timeout = mplayer_timeout + self._mplayer_stopped_event = threading.Event() self._is_playing_torrent = False @@ -98,9 +99,9 @@ class MediaMplayerPlugin(MediaPlugin): self.mplayer_bin = mplayer_bin def _init_mplayer(self, mplayer_args=None): - if self._mplayer: + if self._player: try: - self._mplayer.quit() + self._player.quit() except: self.logger.debug('Failed to quit mplayer before _exec: {}'. format(str)) @@ -119,7 +120,7 @@ class MediaMplayerPlugin(MediaPlugin): if self._env: popen_args['env'] = self._env - self._mplayer = subprocess.Popen(args, **popen_args) + self._player = subprocess.Popen(args, **popen_args) threading.Thread(target=self._process_monitor()).start() def _build_actions(self): @@ -155,7 +156,7 @@ class MediaMplayerPlugin(MediaPlugin): if cmd_name == 'loadfile' or cmd_name == 'loadlist': self._init_mplayer(mplayer_args) else: - if not self._mplayer: + if not self._player: self.logger.warning('MPlayer is not running') cmd = '{}{}{}{}\n'.format( @@ -163,8 +164,8 @@ class MediaMplayerPlugin(MediaPlugin): cmd_name, ' ' if args else '', ' '.join(repr(a) for a in args)).encode() - self._mplayer.stdin.write(cmd) - self._mplayer.stdin.flush() + self._player.stdin.write(cmd) + self._player.stdin.flush() bus = get_bus() if cmd_name == 'loadfile' or cmd_name == 'loadlist': @@ -173,23 +174,23 @@ class MediaMplayerPlugin(MediaPlugin): bus.post(MediaPauseEvent()) elif cmd_name == 'quit' or cmd_name == 'stop': if cmd_name == 'quit': - self._mplayer.terminate() - self._mplayer.wait() - try: self._mplayer.kill() + self._player.terminate() + self._player.wait() + try: self._player.kill() except: pass - self._mplayer = None + self._player = None if not wait_for_response: return poll = select.poll() - poll.register(self._mplayer.stdout, select.POLLIN) + poll.register(self._player.stdout, select.POLLIN) last_read_time = time.time() while time.time() - last_read_time < self._mplayer_timeout: result = poll.poll(0) if result: - line = self._mplayer.stdout.readline().decode() + line = self._player.stdout.readline().decode() last_read_time = time.time() if line.startswith('ANS_'): @@ -222,15 +223,17 @@ class MediaMplayerPlugin(MediaPlugin): def _process_monitor(self): def _thread(): - if not self._mplayer: + if not self._player: return - self._mplayer.wait() + self._mplayer_stopped_event.clear() + self._player.wait() try: self.quit() except: pass get_bus().post(MediaStopEvent()) - self._mplayer = None + self._mplayer_stopped_event.set() + self._player = None return _thread diff --git a/platypush/plugins/media/omxplayer.py b/platypush/plugins/media/omxplayer.py index ae50d062b1..c512c6b31f 100644 --- a/platypush/plugins/media/omxplayer.py +++ b/platypush/plugins/media/omxplayer.py @@ -1,3 +1,4 @@ +import enum import os import re import subprocess @@ -13,6 +14,12 @@ from platypush.message.event.video import VideoPlayEvent, VideoPauseEvent, \ from platypush.plugins import action +class PlayerEvent(enum.Enum): + STOP = 'stop' + PLAY = 'play' + PAUSE = 'pause' + + class MediaOmxplayerPlugin(MediaPlugin): """ Plugin to control video and media playback using OMXPlayer. @@ -36,6 +43,7 @@ class MediaOmxplayerPlugin(MediaPlugin): self.args = args self._player = None + self._handlers = { e.value: [] for e in PlayerEvent } @action def play(self, resource): @@ -278,19 +286,36 @@ class MediaOmxplayerPlugin(MediaPlugin): redis.send_message(msg) + def add_handler(self, event_type, callback): + if event_type not in self._handlers.keys(): + raise AttributeError('{} is not a valid PlayerEvent type'. + format(event_type)) + + self._handlers[event_type].append(callback) + def on_play(self): def _f(player): - self.send_message(VideoPlayEvent(video=self._player.get_source())) + video = self._player.get_source() + self.send_message(VideoPlayEvent(video=video)) + for callback in self._handlers[PlayerEvent.PLAY.value]: + callback(video) + return _f def on_pause(self): def _f(player): - self.send_message(VideoPauseEvent(video=self._player.get_source())) + video = self._player.get_source() + self.send_message(VideoPauseEvent(video=video)) + for callback in self._handlers[PlayerEvent.PAUSE.value]: + callback(video) + return _f def on_stop(self): def _f(player): self.send_message(VideoStopEvent()) + for callback in self._handlers[PlayerEvent.STOP.value]: + callback() return _f diff --git a/platypush/plugins/media/webtorrent.py b/platypush/plugins/media/webtorrent.py index b90cab3b9b..1d2884e95d 100644 --- a/platypush/plugins/media/webtorrent.py +++ b/platypush/plugins/media/webtorrent.py @@ -1,8 +1,9 @@ import datetime +import enum import os +import re import select import subprocess -import tempfile import threading import time @@ -11,12 +12,20 @@ from platypush.context import get_bus, get_plugin from platypush.message.response import Response from platypush.plugins.media import PlayerState, MediaPlugin from platypush.message.event.torrent import TorrentDownloadStartEvent, \ - TorrentDownloadCompletedEvent + TorrentDownloadCompletedEvent, TorrentDownloadProgressEvent, \ + TorrentDownloadingMetadataEvent from platypush.plugins import action -from platypush.utils import find_bins_in_path +from platypush.utils import find_bins_in_path, find_files_by_ext, \ + is_process_alive +class TorrentState(enum.Enum): + IDLE = 1 + DOWNLOADING_METADATA = 2 + DOWNLOADING = 3 + DOWNLOADED = 4 + class MediaWebtorrentPlugin(MediaPlugin): """ Plugin to download and stream videos using webtorrent @@ -38,7 +47,8 @@ class MediaWebtorrentPlugin(MediaPlugin): # Download at least 10 MBs before starting streaming _download_size_before_streaming = 10 * 2**20 - def __init__(self, webtorrent_bin=None, *args, **kwargs): + def __init__(self, webtorrent_bin=None, webtorrent_port=None, *args, + **kwargs): """ media.webtorrent will use the default media player plugin you have configured (e.g. mplayer, omxplayer) to stream the torrent. @@ -46,10 +56,15 @@ class MediaWebtorrentPlugin(MediaPlugin): :param webtorrent_bin: Path to your webtorrent executable. If not set, then Platypush will search for the right executable in your PATH :type webtorrent_bin: str + + :param webtorrent_port: Port where the webtorrent will be running + streaming server will be running (default: 8000) + :type webtorrent_port: int """ super().__init__(*args, **kwargs) + self.webtorrent_port = webtorrent_port self._init_webtorrent_bin(webtorrent_bin=webtorrent_bin) self._init_media_player() @@ -97,38 +112,142 @@ class MediaWebtorrentPlugin(MediaPlugin): self._supported_media_plugins)) - def _process_monitor(self, resource, output_file): + def _read_process_line(self): + line = self._webtorrent_process.stdout.readline().decode().strip() + # Strip output of the colors + return re.sub('\x1b\[((\d+m)|(.{1,2}))', '', line).strip() + + + def _process_monitor(self, resource, download_dir): def _thread(): if not self._webtorrent_process: return + ###### + state = TorrentState.IDLE bus = get_bus() - bus.post(TorrentDownloadStartEvent(resource=resource)) + webtorrent_url = None + media_file = None + poll = select.poll() + poll.register(self._webtorrent_process.stdout, select.POLLIN) + + # First wait for the metadata to be ready and the streaming started while True: + result = poll.poll(0) + if not result: + continue + + if not self._is_process_alive(): + break + + line = self._read_process_line() + + if 'fetching torrent metadata from' in line.lower() \ + and state == TorrentState.IDLE: + # IDLE -> DOWNLOADING_METADATA + state = TorrentState.DOWNLOADING_METADATA + bus.post(TorrentDownloadingMetadataEvent(resource=resource)) + elif 'downloading: ' in line.lower() \ + and media_file is None: + # Find video files in torrent directory + output_dir = os.path.join( + download_dir, re.search( + 'downloading: (.+?)$', line, flags=re.IGNORECASE + ).group(1)) + + media_files = sorted(find_files_by_ext( + output_dir, *self._media_plugin.video_extensions)) + + if not media_files: + raise RuntimeError('No video files found in {}'. + format(output_dir)) + + media_file = os.path.join(output_dir, media_files[0]) + elif 'server running at: ' in line.lower() \ + and webtorrent_url is None: + # Streaming started + webtorrent_url = re.search('server running at: (.+?)$', + line, flags=re.IGNORECASE).group(1) + self.logger.info('Torrent stream started on {}'.format( + webtorrent_url)) + + if state.value <= TorrentState.DOWNLOADING_METADATA.value \ + and media_file and webtorrent_url: + # DOWNLOADING_METADATA -> DOWNLOADING + state = TorrentState.DOWNLOADING + bus.post(TorrentDownloadStartEvent( + resource=resource, media_file=media_file, + stream_url=webtorrent_url)) + break + + + if not media_file or not webtorrent_url: + raise RuntimeError('The webtorrent process did not ' + + 'provide the required data') + + # Then wait until we have enough chunks to start the player + while True: + result = poll.poll(0) + if not result: + continue + + if not self._is_process_alive(): + break + try: - if os.path.getsize(output_file) > \ + if os.path.getsize(media_file) > \ self._download_size_before_streaming: break except FileNotFoundError: - pass + continue - self._media_plugin.play(output_file) - self._webtorrent_process.wait() + self.logger.info( + 'Starting playback of {} to {} through {}'.format( + media_file, self._media_plugin.__class__.__name__, + webtorrent_url)) + + media = media_file if self._media_plugin.is_local() \ + else webtorrent_url + + self._media_plugin.play(media) + self.logger.info('Waiting for player to terminate') + self._wait_for_player() + self.logger.info('Torrent player terminated') bus.post(TorrentDownloadCompletedEvent(resource=resource)) - self._webtorrent_process = None + + try: self.quit() + except: pass + self.logger.info('WebTorrent process terminated') return _thread + def _wait_for_player(self): + media_cls = self._media_plugin.__class__.__name__ + stop_evt = None - def _get_torrent_download_path(self): - if self._media_plugin.download_dir: - # TODO set proper file name based on the torrent metadata - return os.path.join(self._media_plugin.download_dir, - 'torrent_media_' + datetime.datetime. - today().strftime('%Y-%m-%d_%H-%M-%S-%f')) + if media_cls == 'MediaMplayerPlugin': + stop_evt = self._media_plugin._mplayer_stopped_event + elif media_cls == 'MediaOmxplayerPlugin': + stop_evt = threading.Event() + def stop_callback(): + stop_evt.set() + self._media_plugin.add_handler('stop', stop_callback) + + if stop_evt: + stop_evt.wait() else: - return tempfile.NamedTemporaryFile(delete=False).name + # Fallback: wait for the webtorrent process to terminate + self._webtorrent_process.wait() + + + def _get_torrent_download_dir(self): + if self._media_plugin.download_dir: + return self._media_plugin.download_dir + else: + d = os.path.join(os.environ['HOME'], 'Downloads') + os.makedirs(d, exist_ok=True) + return d @action @@ -148,16 +267,18 @@ class MediaWebtorrentPlugin(MediaPlugin): self.logger.debug('Failed to quit the previous instance: {}'. format(str)) - output_file = self._get_torrent_download_path() - webtorrent_args = [self.webtorrent_bin, '--stdout', resource] + download_dir = self._get_torrent_download_dir() + webtorrent_args = [self.webtorrent_bin, 'download', '-o', download_dir] - with open(output_file, 'w') as f: - self._webtorrent_process = subprocess.Popen(webtorrent_args, - stdout=f) + if self.webtorrent_port: + webtorrent_args += ['-p', self.webtorrent_port] - threading.Thread(target=self._process_monitor( - resource=resource, output_file=output_file)).start() + webtorrent_args += [resource] + self._webtorrent_process = subprocess.Popen(webtorrent_args, + stdout=subprocess.PIPE) + threading.Thread(target=self._process_monitor( + resource=resource, download_dir=download_dir)).start() return { 'resource': resource } @@ -169,8 +290,7 @@ class MediaWebtorrentPlugin(MediaPlugin): @action def quit(self): """ Quit the player """ - if self._webtorrent_process and self._is_process_alive( - self._webtorrent_process.pid): + if self._is_process_alive(): self._webtorrent_process.terminate() self._webtorrent_process.wait() try: self._webtorrent_process.kill() @@ -186,15 +306,8 @@ class MediaWebtorrentPlugin(MediaPlugin): return self.play(resource) def _is_process_alive(self): - if not self._webtorrent_process: - return False - - try: - os.kill(self._webtorrent_process.pid, 0) - return True - except OSError: - self._webtorrent_process = None - return False + return is_process_alive(self._webtorrent_process.pid) \ + if self._webtorrent_process else False @action def status(self): diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index 123de1da4a..ad9a43af9a 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -180,4 +180,34 @@ def find_bins_in_path(bin_name): os.access(os.path.join(p, bin_name), os.X_OK))] +def find_files_by_ext(directory, *exts): + """ + Finds all the files in the given directory with the provided extensions + """ + + if not exts: + raise AttributeError('No extensions provided') + + if not os.path.isdir(directory): + raise AttributeError('{} is not a valid directory'.format(directory)) + + min_len = len(min(exts, key=len)) + max_len = len(max(exts, key=len)) + result = [] + + for root, dirs, files in os.walk(directory): + for i in range(min_len, max_len+1): + result += [f for f in files if f[-i:] in exts] + + return result + + +def is_process_alive(pid): + try: + os.kill(pid, 0) + return True + except OSError: + return False + + # vim:sw=4:ts=4:et: