diff --git a/platypush/message/event/torrent.py b/platypush/message/event/torrent.py index e3d3d38a3..43c4fea09 100644 --- a/platypush/message/event/torrent.py +++ b/platypush/message/event/torrent.py @@ -10,6 +10,15 @@ class TorrentEvent(Event): super().__init__(*args, **kwargs) +class TorrentDownloadingMetadataEvent(TorrentEvent): + """ + Event triggered upon torrent metadata download start + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + class TorrentDownloadStartEvent(TorrentEvent): """ Event triggered upon torrent download start diff --git a/platypush/plugins/media/__init__.py b/platypush/plugins/media/__init__.py index 681aea406..6d8ec6b5a 100644 --- a/platypush/plugins/media/__init__.py +++ b/platypush/plugins/media/__init__.py @@ -27,6 +27,10 @@ class MediaPlugin(Plugin): * **youtube-dl** installed on your system (see your distro instructions), optional for YouTube support """ + # A media plugin can either be local or remote (e.g. control media on + # another device) + _is_local = True + _NOT_IMPLEMENTED_ERR = NotImplementedError( 'This method must be implemented in a derived class') @@ -360,5 +364,8 @@ class MediaPlugin(Plugin): return proc.stdout.read().decode("utf-8", "strict")[:-1] + def is_local(self): + return self._is_local + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/media/mplayer.py b/platypush/plugins/media/mplayer.py index c145c5574..4f480b296 100644 --- a/platypush/plugins/media/mplayer.py +++ b/platypush/plugins/media/mplayer.py @@ -72,8 +72,9 @@ class MediaMplayerPlugin(MediaPlugin): self.args = args or [] self._init_mplayer_bin() self._build_actions() - self._mplayer = None + self._player = None self._mplayer_timeout = mplayer_timeout + self._mplayer_stopped_event = threading.Event() self._is_playing_torrent = False @@ -98,9 +99,9 @@ class MediaMplayerPlugin(MediaPlugin): self.mplayer_bin = mplayer_bin def _init_mplayer(self, mplayer_args=None): - if self._mplayer: + if self._player: try: - self._mplayer.quit() + self._player.quit() except: self.logger.debug('Failed to quit mplayer before _exec: {}'. format(str)) @@ -119,7 +120,7 @@ class MediaMplayerPlugin(MediaPlugin): if self._env: popen_args['env'] = self._env - self._mplayer = subprocess.Popen(args, **popen_args) + self._player = subprocess.Popen(args, **popen_args) threading.Thread(target=self._process_monitor()).start() def _build_actions(self): @@ -155,7 +156,7 @@ class MediaMplayerPlugin(MediaPlugin): if cmd_name == 'loadfile' or cmd_name == 'loadlist': self._init_mplayer(mplayer_args) else: - if not self._mplayer: + if not self._player: self.logger.warning('MPlayer is not running') cmd = '{}{}{}{}\n'.format( @@ -163,8 +164,8 @@ class MediaMplayerPlugin(MediaPlugin): cmd_name, ' ' if args else '', ' '.join(repr(a) for a in args)).encode() - self._mplayer.stdin.write(cmd) - self._mplayer.stdin.flush() + self._player.stdin.write(cmd) + self._player.stdin.flush() bus = get_bus() if cmd_name == 'loadfile' or cmd_name == 'loadlist': @@ -173,23 +174,23 @@ class MediaMplayerPlugin(MediaPlugin): bus.post(MediaPauseEvent()) elif cmd_name == 'quit' or cmd_name == 'stop': if cmd_name == 'quit': - self._mplayer.terminate() - self._mplayer.wait() - try: self._mplayer.kill() + self._player.terminate() + self._player.wait() + try: self._player.kill() except: pass - self._mplayer = None + self._player = None if not wait_for_response: return poll = select.poll() - poll.register(self._mplayer.stdout, select.POLLIN) + poll.register(self._player.stdout, select.POLLIN) last_read_time = time.time() while time.time() - last_read_time < self._mplayer_timeout: result = poll.poll(0) if result: - line = self._mplayer.stdout.readline().decode() + line = self._player.stdout.readline().decode() last_read_time = time.time() if line.startswith('ANS_'): @@ -222,15 +223,17 @@ class MediaMplayerPlugin(MediaPlugin): def _process_monitor(self): def _thread(): - if not self._mplayer: + if not self._player: return - self._mplayer.wait() + self._mplayer_stopped_event.clear() + self._player.wait() try: self.quit() except: pass get_bus().post(MediaStopEvent()) - self._mplayer = None + self._mplayer_stopped_event.set() + self._player = None return _thread diff --git a/platypush/plugins/media/omxplayer.py b/platypush/plugins/media/omxplayer.py index ae50d062b..c512c6b31 100644 --- a/platypush/plugins/media/omxplayer.py +++ b/platypush/plugins/media/omxplayer.py @@ -1,3 +1,4 @@ +import enum import os import re import subprocess @@ -13,6 +14,12 @@ from platypush.message.event.video import VideoPlayEvent, VideoPauseEvent, \ from platypush.plugins import action +class PlayerEvent(enum.Enum): + STOP = 'stop' + PLAY = 'play' + PAUSE = 'pause' + + class MediaOmxplayerPlugin(MediaPlugin): """ Plugin to control video and media playback using OMXPlayer. @@ -36,6 +43,7 @@ class MediaOmxplayerPlugin(MediaPlugin): self.args = args self._player = None + self._handlers = { e.value: [] for e in PlayerEvent } @action def play(self, resource): @@ -278,19 +286,36 @@ class MediaOmxplayerPlugin(MediaPlugin): redis.send_message(msg) + def add_handler(self, event_type, callback): + if event_type not in self._handlers.keys(): + raise AttributeError('{} is not a valid PlayerEvent type'. + format(event_type)) + + self._handlers[event_type].append(callback) + def on_play(self): def _f(player): - self.send_message(VideoPlayEvent(video=self._player.get_source())) + video = self._player.get_source() + self.send_message(VideoPlayEvent(video=video)) + for callback in self._handlers[PlayerEvent.PLAY.value]: + callback(video) + return _f def on_pause(self): def _f(player): - self.send_message(VideoPauseEvent(video=self._player.get_source())) + video = self._player.get_source() + self.send_message(VideoPauseEvent(video=video)) + for callback in self._handlers[PlayerEvent.PAUSE.value]: + callback(video) + return _f def on_stop(self): def _f(player): self.send_message(VideoStopEvent()) + for callback in self._handlers[PlayerEvent.STOP.value]: + callback() return _f diff --git a/platypush/plugins/media/webtorrent.py b/platypush/plugins/media/webtorrent.py index b90cab3b9..1d2884e95 100644 --- a/platypush/plugins/media/webtorrent.py +++ b/platypush/plugins/media/webtorrent.py @@ -1,8 +1,9 @@ import datetime +import enum import os +import re import select import subprocess -import tempfile import threading import time @@ -11,12 +12,20 @@ from platypush.context import get_bus, get_plugin from platypush.message.response import Response from platypush.plugins.media import PlayerState, MediaPlugin from platypush.message.event.torrent import TorrentDownloadStartEvent, \ - TorrentDownloadCompletedEvent + TorrentDownloadCompletedEvent, TorrentDownloadProgressEvent, \ + TorrentDownloadingMetadataEvent from platypush.plugins import action -from platypush.utils import find_bins_in_path +from platypush.utils import find_bins_in_path, find_files_by_ext, \ + is_process_alive +class TorrentState(enum.Enum): + IDLE = 1 + DOWNLOADING_METADATA = 2 + DOWNLOADING = 3 + DOWNLOADED = 4 + class MediaWebtorrentPlugin(MediaPlugin): """ Plugin to download and stream videos using webtorrent @@ -38,7 +47,8 @@ class MediaWebtorrentPlugin(MediaPlugin): # Download at least 10 MBs before starting streaming _download_size_before_streaming = 10 * 2**20 - def __init__(self, webtorrent_bin=None, *args, **kwargs): + def __init__(self, webtorrent_bin=None, webtorrent_port=None, *args, + **kwargs): """ media.webtorrent will use the default media player plugin you have configured (e.g. mplayer, omxplayer) to stream the torrent. @@ -46,10 +56,15 @@ class MediaWebtorrentPlugin(MediaPlugin): :param webtorrent_bin: Path to your webtorrent executable. If not set, then Platypush will search for the right executable in your PATH :type webtorrent_bin: str + + :param webtorrent_port: Port where the webtorrent will be running + streaming server will be running (default: 8000) + :type webtorrent_port: int """ super().__init__(*args, **kwargs) + self.webtorrent_port = webtorrent_port self._init_webtorrent_bin(webtorrent_bin=webtorrent_bin) self._init_media_player() @@ -97,38 +112,142 @@ class MediaWebtorrentPlugin(MediaPlugin): self._supported_media_plugins)) - def _process_monitor(self, resource, output_file): + def _read_process_line(self): + line = self._webtorrent_process.stdout.readline().decode().strip() + # Strip output of the colors + return re.sub('\x1b\[((\d+m)|(.{1,2}))', '', line).strip() + + + def _process_monitor(self, resource, download_dir): def _thread(): if not self._webtorrent_process: return + ###### + state = TorrentState.IDLE bus = get_bus() - bus.post(TorrentDownloadStartEvent(resource=resource)) + webtorrent_url = None + media_file = None + poll = select.poll() + poll.register(self._webtorrent_process.stdout, select.POLLIN) + + # First wait for the metadata to be ready and the streaming started while True: + result = poll.poll(0) + if not result: + continue + + if not self._is_process_alive(): + break + + line = self._read_process_line() + + if 'fetching torrent metadata from' in line.lower() \ + and state == TorrentState.IDLE: + # IDLE -> DOWNLOADING_METADATA + state = TorrentState.DOWNLOADING_METADATA + bus.post(TorrentDownloadingMetadataEvent(resource=resource)) + elif 'downloading: ' in line.lower() \ + and media_file is None: + # Find video files in torrent directory + output_dir = os.path.join( + download_dir, re.search( + 'downloading: (.+?)$', line, flags=re.IGNORECASE + ).group(1)) + + media_files = sorted(find_files_by_ext( + output_dir, *self._media_plugin.video_extensions)) + + if not media_files: + raise RuntimeError('No video files found in {}'. + format(output_dir)) + + media_file = os.path.join(output_dir, media_files[0]) + elif 'server running at: ' in line.lower() \ + and webtorrent_url is None: + # Streaming started + webtorrent_url = re.search('server running at: (.+?)$', + line, flags=re.IGNORECASE).group(1) + self.logger.info('Torrent stream started on {}'.format( + webtorrent_url)) + + if state.value <= TorrentState.DOWNLOADING_METADATA.value \ + and media_file and webtorrent_url: + # DOWNLOADING_METADATA -> DOWNLOADING + state = TorrentState.DOWNLOADING + bus.post(TorrentDownloadStartEvent( + resource=resource, media_file=media_file, + stream_url=webtorrent_url)) + break + + + if not media_file or not webtorrent_url: + raise RuntimeError('The webtorrent process did not ' + + 'provide the required data') + + # Then wait until we have enough chunks to start the player + while True: + result = poll.poll(0) + if not result: + continue + + if not self._is_process_alive(): + break + try: - if os.path.getsize(output_file) > \ + if os.path.getsize(media_file) > \ self._download_size_before_streaming: break except FileNotFoundError: - pass + continue - self._media_plugin.play(output_file) - self._webtorrent_process.wait() + self.logger.info( + 'Starting playback of {} to {} through {}'.format( + media_file, self._media_plugin.__class__.__name__, + webtorrent_url)) + + media = media_file if self._media_plugin.is_local() \ + else webtorrent_url + + self._media_plugin.play(media) + self.logger.info('Waiting for player to terminate') + self._wait_for_player() + self.logger.info('Torrent player terminated') bus.post(TorrentDownloadCompletedEvent(resource=resource)) - self._webtorrent_process = None + + try: self.quit() + except: pass + self.logger.info('WebTorrent process terminated') return _thread + def _wait_for_player(self): + media_cls = self._media_plugin.__class__.__name__ + stop_evt = None - def _get_torrent_download_path(self): - if self._media_plugin.download_dir: - # TODO set proper file name based on the torrent metadata - return os.path.join(self._media_plugin.download_dir, - 'torrent_media_' + datetime.datetime. - today().strftime('%Y-%m-%d_%H-%M-%S-%f')) + if media_cls == 'MediaMplayerPlugin': + stop_evt = self._media_plugin._mplayer_stopped_event + elif media_cls == 'MediaOmxplayerPlugin': + stop_evt = threading.Event() + def stop_callback(): + stop_evt.set() + self._media_plugin.add_handler('stop', stop_callback) + + if stop_evt: + stop_evt.wait() else: - return tempfile.NamedTemporaryFile(delete=False).name + # Fallback: wait for the webtorrent process to terminate + self._webtorrent_process.wait() + + + def _get_torrent_download_dir(self): + if self._media_plugin.download_dir: + return self._media_plugin.download_dir + else: + d = os.path.join(os.environ['HOME'], 'Downloads') + os.makedirs(d, exist_ok=True) + return d @action @@ -148,16 +267,18 @@ class MediaWebtorrentPlugin(MediaPlugin): self.logger.debug('Failed to quit the previous instance: {}'. format(str)) - output_file = self._get_torrent_download_path() - webtorrent_args = [self.webtorrent_bin, '--stdout', resource] + download_dir = self._get_torrent_download_dir() + webtorrent_args = [self.webtorrent_bin, 'download', '-o', download_dir] - with open(output_file, 'w') as f: - self._webtorrent_process = subprocess.Popen(webtorrent_args, - stdout=f) + if self.webtorrent_port: + webtorrent_args += ['-p', self.webtorrent_port] - threading.Thread(target=self._process_monitor( - resource=resource, output_file=output_file)).start() + webtorrent_args += [resource] + self._webtorrent_process = subprocess.Popen(webtorrent_args, + stdout=subprocess.PIPE) + threading.Thread(target=self._process_monitor( + resource=resource, download_dir=download_dir)).start() return { 'resource': resource } @@ -169,8 +290,7 @@ class MediaWebtorrentPlugin(MediaPlugin): @action def quit(self): """ Quit the player """ - if self._webtorrent_process and self._is_process_alive( - self._webtorrent_process.pid): + if self._is_process_alive(): self._webtorrent_process.terminate() self._webtorrent_process.wait() try: self._webtorrent_process.kill() @@ -186,15 +306,8 @@ class MediaWebtorrentPlugin(MediaPlugin): return self.play(resource) def _is_process_alive(self): - if not self._webtorrent_process: - return False - - try: - os.kill(self._webtorrent_process.pid, 0) - return True - except OSError: - self._webtorrent_process = None - return False + return is_process_alive(self._webtorrent_process.pid) \ + if self._webtorrent_process else False @action def status(self): diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index 123de1da4..ad9a43af9 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -180,4 +180,34 @@ def find_bins_in_path(bin_name): os.access(os.path.join(p, bin_name), os.X_OK))] +def find_files_by_ext(directory, *exts): + """ + Finds all the files in the given directory with the provided extensions + """ + + if not exts: + raise AttributeError('No extensions provided') + + if not os.path.isdir(directory): + raise AttributeError('{} is not a valid directory'.format(directory)) + + min_len = len(min(exts, key=len)) + max_len = len(max(exts, key=len)) + result = [] + + for root, dirs, files in os.walk(directory): + for i in range(min_len, max_len+1): + result += [f for f in files if f[-i:] in exts] + + return result + + +def is_process_alive(pid): + try: + os.kill(pid, 0) + return True + except OSError: + return False + + # vim:sw=4:ts=4:et: