diff --git a/platypush/plugins/media/mplayer/__init__.py b/platypush/plugins/media/mplayer/__init__.py index d3ab6c8d..e5ca66e5 100644 --- a/platypush/plugins/media/mplayer/__init__.py +++ b/platypush/plugins/media/mplayer/__init__.py @@ -1,30 +1,57 @@ -from dataclasses import asdict +import json import os import re -import select import subprocess import threading import time +from dataclasses import asdict, dataclass +from multiprocessing import Process, Queue, RLock +from queue import Empty from typing import Any, Collection, Dict, List, Optional -from platypush.context import get_bus from platypush.message.response import Response +from platypush.plugins import action from platypush.plugins.media import PlayerState, MediaPlugin from platypush.message.event.media import ( MediaPlayEvent, MediaPlayRequestEvent, MediaPauseEvent, + MediaResumeEvent, MediaStopEvent, NewPlayingMediaEvent, ) - -from platypush.plugins import action from platypush.utils import find_bins_in_path +@dataclass +class MplayerStatus: + """ + MPlayer status object + """ + + state: PlayerState = PlayerState.STOP + filename: Optional[str] = None + path: Optional[str] = None + title: Optional[str] = None + duration: Optional[float] = None + position: Optional[float] = None + percent_pos: Optional[float] = None + fullscreen: Optional[bool] = None + mute: Optional[bool] = None + pause: Optional[bool] = None + volume: Optional[float] = None + volume_max: Optional[float] = None + seekable: Optional[bool] = None + url: Optional[str] = None + + class MediaMplayerPlugin(MediaPlugin): """ Plugin to control MPlayer instances. + + Note that some plugin methods are populated dynamically by introspecting the + mplayer executable. You can verify the supported methods at runtime by + running the :meth:`.list_actions` action. """ _mplayer_default_communicate_timeout = 0.5 @@ -41,16 +68,15 @@ class MediaMplayerPlugin(MediaPlugin): def __init__( self, + fullscreen: bool = False, mplayer_bin: Optional[str] = None, mplayer_timeout: float = _mplayer_default_communicate_timeout, args: Optional[Collection[str]] = None, **kwargs, ): """ - Create the MPlayer wrapper. Note that the plugin methods are populated - dynamically by introspecting the mplayer executable. You can verify the - supported methods at runtime by using the `list_actions` method. - + :param fullscreen: If set to True then the player will be started in + fullscreen mode (default: False) :param mplayer_bin: Path to the MPlayer executable (default: search for the first occurrence in your system PATH environment variable) :param mplayer_timeout: Timeout in seconds to wait for more data @@ -63,11 +89,16 @@ class MediaMplayerPlugin(MediaPlugin): self.args = args or [] self._init_mplayer_bin(mplayer_bin=mplayer_bin) + self._fullscreen = fullscreen self._build_actions() self._player = None self._mplayer_timeout = mplayer_timeout - self._mplayer_stopped_event = threading.Event() self._status_lock = threading.Lock() + self._status = MplayerStatus() + self._answer_queue = Queue() + self._proc_monitor: Optional[Process] = None + self._cmd_lock = RLock() + self._cleanup_lock = RLock() def _init_mplayer_bin(self, mplayer_bin=None): if not mplayer_bin: @@ -103,6 +134,9 @@ class MediaMplayerPlugin(MediaPlugin): m_args = mplayer_args or [] args = [self.mplayer_bin] + self._mplayer_bin_default_args + if self._fullscreen and '-fs' not in args: + args.append('-fs') + for arg in (*self.args, *m_args): if arg not in args: args.append(arg) @@ -116,45 +150,45 @@ class MediaMplayerPlugin(MediaPlugin): popen_args['env'] = self._env self._player = subprocess.Popen(args, **popen_args) - threading.Thread(target=self._process_monitor()).start() + self._proc_monitor = Process(target=self._listener, name='mplayer-monitor') + self._proc_monitor.start() def _build_actions(self): """Populates the actions list by introspecting the mplayer executable""" - self._actions = {} - mplayer = subprocess.Popen( - [self.mplayer_bin, '-input', 'cmdlist'], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - ) - def args_pprint(txt): lc = txt.lower() if lc[0] == '[': return f'{lc[1:-1]}=None' return lc - while True: - if not mplayer.stdout: - break + self._actions = {} + with subprocess.Popen( + [self.mplayer_bin, '-input', 'cmdlist'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) as mplayer: + while True: + if not mplayer.stdout: + break - line = mplayer.stdout.readline() - if not line: - break - line = line.decode() - if line[0].isupper(): - continue - args = line.split() - cmd_name = args.pop(0) - arguments = ', '.join([args_pprint(a) for a in args]) - self._actions[cmd_name] = f'{cmd_name}({arguments})' + line = mplayer.stdout.readline() + if not line: + break + + line = line.decode() + if line[0].isupper(): + continue + + args = line.split() + cmd_name = args.pop(0) + arguments = ', '.join([args_pprint(a) for a in args]) + self._actions[cmd_name] = f'{cmd_name}({arguments})' def _exec( self, cmd, *args, mplayer_args=None, prefix=None, wait_for_response=False - ): + ) -> Optional[dict]: cmd_name = cmd - response = None - if cmd_name in {'loadfile', 'loadlist'}: self._init_mplayer(mplayer_args) else: @@ -170,62 +204,129 @@ class MediaMplayerPlugin(MediaPlugin): ).encode() if not self._player: - self.logger.warning('Cannot send command %s: player unavailable', cmd) - return + self.logger.debug('Cannot send command %s: player unavailable', cmd) + return None - if not self._player.stdin: + if not (self._player.stdin and self._player.stdin.writable()): self.logger.warning( 'Could not communicate with the mplayer process: the stdin is closed' ) - return + return None - self._player.stdin.write(cmd) - self._player.stdin.flush() + # Make sure that the response queue is empty before waiting for a new response + while not self._answer_queue.empty(): + self._answer_queue.get() + + self.logger.debug('mplayer interface:: Sending command: %s', cmd) + + with self._cmd_lock: + try: + self._player.stdin.write(cmd) + self._player.stdin.flush() + except BrokenPipeError: + self.logger.info('The MPlayer process has terminated') + self._cleanup() + return None + except Exception as e: + self.logger.warning( + 'Failed to send command %s: %s: %s', cmd, type(e).__name__, e + ) + return None if cmd_name in {'loadfile', 'loadlist'}: self._post_event(NewPlayingMediaEvent, resource=args[0]) - elif cmd_name == 'pause': - self._post_event(MediaPauseEvent) - elif cmd_name == 'quit': - self._player.terminate() - self._player.wait() - try: - self._player.kill() - except Exception: - pass - self._player = None if not wait_for_response: - return + return None - if not (self._player and self._player.stdout): - self.logger.warning( - 'Could not communicate with the mplayer process: the stdout is closed' + # Get the response from the queue + try: + ret, status = self._answer_queue.get( + block=True, timeout=self._mplayer_timeout ) - return + self._status = status + except Empty: + self.logger.warning('No response from mplayer for command %s', cmd) + return None - poll = select.poll() - poll.register(self._player.stdout, select.POLLIN) - last_read_time = time.time() + return ret - while time.time() - last_read_time < self._mplayer_timeout: - result = poll.poll(0) - if result: - if not self._player: + def _process_answer(self, answer: dict): + for k, v in answer.items(): + if k == 'pause': + if v and self._status.state == PlayerState.PLAY: + self._status.state = PlayerState.PAUSE + self._post_event(MediaPauseEvent) + elif not v: + if self._status.state == PlayerState.PAUSE: + self._post_event(MediaResumeEvent) + elif self._status.state == PlayerState.STOP: + self._post_event(MediaPlayEvent) + + self._status.state = PlayerState.PLAY + elif k == 'filename': + self._status.filename = v + elif k == 'path': + self._status.path = v + self._status.url = ( + 'file://' if os.path.isfile(v) else '' + ) + self._status.path + elif k == 'fullscreen': + self._status.fullscreen = v + elif k == 'mute': + self._status.mute = v + elif k == 'percent_pos': + self._status.percent_pos = v + elif k == 'time_pos': + self._status.position = v + elif k == 'volume': + self._status.volume = v + elif k == 'length': + self._status.duration = v + + self._answer_queue.put((answer, self._status)) + + def _status_checker(self): + try: + while self._player and self._player.stdin and self._player.stdin.writable(): + try: + self._get_property('filename') + except (IOError, ValueError, KeyboardInterrupt): + break + finally: + time.sleep(1) + except Exception as e: + self.logger.warning('mplayer status checker process failed: %s', e) + + def _listener(self): + status_checker = Process( + target=self._status_checker, name='mplayer-status-checker' + ) + + try: + status_checker.start() + + while ( + self._player and self._player.stdout and self._player.stdout.readable() + ): + try: + buf = self._player.stdout.readline() + except (IOError, ValueError, KeyboardInterrupt): break - buf = self._player.stdout.readline() line = buf.decode() if isinstance(buf, bytes) else buf - last_read_time = time.time() + self.logger.debug('mplayer interface:: Received line: %s', buf) + + if not line: + break if line.startswith('ANS_'): - m = re.match('^([^=]+)=(.*)$', line[4:]) + m = re.match(r'^([^=]+)=(.*)\s*$', line[4:]) if not m: self.logger.warning('Unexpected response: %s', line) break k, v = m.group(1), m.group(2) - v = v.strip() if v == 'yes': v = True elif v == 'no': @@ -233,13 +334,31 @@ class MediaMplayerPlugin(MediaPlugin): try: if isinstance(v, str): - v = eval(v) # pylint: disable=eval-used - except Exception: + v = json.loads(v) + except (TypeError, ValueError): pass - response = {k: v} + self._process_answer({k: v}) + elif line.startswith('Starting playback'): + self._status.state = PlayerState.PLAY + self._post_event(MediaPlayEvent) + finally: + if status_checker and status_checker.is_alive(): + status_checker.terminate() + status_checker.join(timeout=5) + try: + status_checker.kill() + except Exception: + pass - return response + if self._player: + self._player.wait() + try: + self.quit() + except Exception: + pass + + self._player = None @action def execute(self, cmd, args=None): @@ -259,28 +378,16 @@ class MediaMplayerPlugin(MediaPlugin): for a in sorted(self._actions.keys()) ] - def _process_monitor(self): - def _thread(): - if not self._player: - return - - self._mplayer_stopped_event.clear() - self._player.wait() - try: - self.quit() - except Exception: - pass - - self._post_event(MediaStopEvent) - self._mplayer_stopped_event.set() - self._player = None - - return _thread - - @staticmethod - def _post_event(evt_type, **evt): - bus = get_bus() - bus.post(evt_type(player='local', plugin='media.mplayer', **evt)) + def _post_event(self, evt_type, **evt): + self._bus.post( + evt_type( + player='local', + plugin='media.mplayer', + resource=evt.pop('resource', self._status.url), + title=self._status.title or self._status.filename, + **evt, + ) + ) @action def play( @@ -288,6 +395,7 @@ class MediaMplayerPlugin(MediaPlugin): resource: str, subtitles: Optional[str] = None, mplayer_args: Optional[List[str]] = None, + **_, ): """ Play a resource. @@ -309,8 +417,6 @@ class MediaMplayerPlugin(MediaPlugin): resource = resource[7:] self._exec('loadfile', resource, mplayer_args=mplayer_args) - self._post_event(MediaPlayEvent, resource=resource) - if self.volume: self.set_volume(volume=self.volume) @@ -320,34 +426,72 @@ class MediaMplayerPlugin(MediaPlugin): def pause(self, *_, **__): """Toggle the paused state""" self._exec('pause') - self._post_event(MediaPauseEvent) return self.status() @action def stop(self, *_, **__): """Stop the playback""" - # return self._exec('stop') - self.quit() - return self.status() + return self.quit() + + def _cleanup(self): + with self._cleanup_lock: + if self._player: + self._player.terminate() + self._player.wait() + try: + self._player.kill() + except Exception: + pass + + self._player = None + self._post_event(MediaStopEvent) + + if self._proc_monitor and os.getpid() != self._proc_monitor.pid: + try: + self._proc_monitor.terminate() + except Exception as e: + if self._proc_monitor or self._proc_monitor.is_alive(): + self.logger.warning( + 'Failed to terminate MPlayer monitor process: %s', e + ) + + if self._proc_monitor: + try: + self._proc_monitor.join(timeout=5) + except AssertionError: # Can only join a child process + pass + + try: + self._proc_monitor.kill() + except Exception: + pass + + self._proc_monitor = None @action def quit(self, *_, **__): """Quit the player""" self._exec('quit') - self._post_event(MediaStopEvent) + self._cleanup() return self.status() @action - def voldown(self, *_, step=5.0, **__): + def voldown(self, *_, step=10.0, **__): """Volume down by (default: 10)%""" - self._exec('volume', -step * 10) - return self.status() + volume = (self._get_property('volume') or {}).get('volume') + if volume is None: + return self.status() + + return self.set_volume(volume=volume - step) @action - def volup(self, *_, step=5.0, **__): + def volup(self, *_, step=10.0, **__): """Volume up by (default: 10)%""" - self._exec('volume', step * 10) - return self.status() + volume = (self._get_property('volume') or {}).get('volume') + if volume is None: + return self.status() + + return self.set_volume(volume=volume + step) @action def back(self, *_, offset=30.0, **__): @@ -364,9 +508,7 @@ class MediaMplayerPlugin(MediaPlugin): @action def toggle_subtitles(self, *_, **__): """Toggle the subtitles visibility""" - response: dict = ( - self.get_property('sub_visibility').output or {} # type: ignore - ) + response: dict = self._get_property('sub_visibility') or {} subs = response.get('sub_visibility') self._exec('sub_visibility', int(not subs)) return self.status() @@ -390,9 +532,9 @@ class MediaMplayerPlugin(MediaPlugin): :param index: (1-based) index of the subtitles track to remove. """ if index is None: - self._exec('sub_remove') + self._exec('sub_remove', prefix='pausing_keep_force') else: - self._exec('sub_remove', index) + self._exec('sub_remove', index, prefix='pausing_keep_force') return self.status() @@ -416,28 +558,30 @@ class MediaMplayerPlugin(MediaPlugin): @action def mute(self, *_, **__): """Toggle mute state""" - self._exec('mute') + self._exec('mute', prefix='pausing_keep_force') return self.status() @action def seek(self, position: float, *_, **__): """ - Seek backward/forward by the specified number of seconds + Alias for :meth:`.set_position` :param position: Number of seconds relative to the current cursor - :type position: int """ - self.step_property('time_pos', position) - return self.status() + return self.set_position(position) @action def set_position(self, position: float, *_, **__): """ - Seek backward/forward to the specified absolute position + Set the playback position. :param position: Number of seconds from the start - :type position: int """ + # cur_pos = (self._get_property('time_pos') or {}).get('time_pos') + # if cur_pos is None: + # return self.status() + + # self.set_property('time_pos', position - cur_pos) self.set_property('time_pos', position) return self.status() @@ -447,9 +591,8 @@ class MediaMplayerPlugin(MediaPlugin): Set the volume :param volume: Volume value between 0 and 100 - :type volume: float """ - self._exec('volume', volume) + self._exec('volume', max(0, min(100, volume)), 1, prefix='pausing_keep_force') return self.status() @action @@ -461,9 +604,26 @@ class MediaMplayerPlugin(MediaPlugin): Example:: - output = { - "state": "play" # or "stop" or "pause" - } + .. code-block:: javascript + + { + "duration": 300.0, // in seconds + "filename": "video.mp4", + "fullscreen": false, + "mute": false, + "name": "video.mp4", + "path": "/path/to/video.mp4", + "pause": false, + "percent_pos": 30.0, + "position": 90.0, // in seconds + "seekable": true, + "state": "play", // or "stop" or "pause" + "title": "My Video", + "volume": 50.0, + "volume_max": 100.0, + "url": "file:///path/to/video.mp4", + } + """ if not self._player: @@ -513,20 +673,14 @@ class MediaMplayerPlugin(MediaPlugin): return status - @action - def get_property( + def _get_property( self, property: str, # pylint: disable=redefined-builtin args: Optional[Collection[str]] = None, - ): - """ - Get a player property (e.g. pause, fullscreen etc.). See - https://www.mplayerhq.hu/DOCS/tech/slave.txt for a full list of the - available properties - """ - + ) -> dict: args = args or [] - response = Response(output={}) + response = {} + errors = [] result = ( self._exec( @@ -539,18 +693,47 @@ class MediaMplayerPlugin(MediaPlugin): or {} ) - for k, v in result.items(): - if k == 'ERROR' and v not in response.errors: - if not isinstance(response.errors, list): - response.errors = [] - response.errors.append(f'{property}{args}: {v}') - else: - if not isinstance(response.output, dict): - response.output = {} - response.output[k] = v + if not result: + return response + for k, v in result.items(): + if k == 'ERROR' and v not in errors: + self._handle_property_error(property, args, v, errors) + else: + response[k] = v + + assert not errors, f'get_property errors: {errors}' return response + def _handle_property_error( + self, + property: str, # pylint: disable=redefined-builtin + args: Optional[Collection[str]], + error: str, + errors: List[str], + ): + if error == 'PROPERTY_UNAVAILABLE': + # This is a workaround to detect the end-of-file event. + # When get_property('filename') returns PROPERTY_UNAVAILABLE + # it means that the player is no longer playing anything + if property == 'filename' and self._status.state != PlayerState.STOP: + self.quit() + else: + errors.append(f'{property}{args}: {error}') + + @action + def get_property( + self, + property: str, # pylint: disable=redefined-builtin + args: Optional[Collection[str]] = None, + ): + """ + Get a player property (e.g. pause, fullscreen etc.). See + https://www.mplayerhq.hu/DOCS/tech/slave.txt for a full list of the + available properties + """ + return self._get_property(property, args=args) + @action def set_property( self, diff --git a/platypush/plugins/media/mpv/__init__.py b/platypush/plugins/media/mpv/__init__.py index 97bf3b5e..7a6b02aa 100644 --- a/platypush/plugins/media/mpv/__init__.py +++ b/platypush/plugins/media/mpv/__init__.py @@ -31,8 +31,6 @@ class MediaMpvPlugin(MediaPlugin): self, args: Optional[Dict[str, Any]] = None, fullscreen: bool = False, **kwargs ): """ - Create the MPV wrapper. - :param args: Default arguments that will be passed to the mpv executable as a key-value dict (names without the `--` prefix). See `man mpv` for available options.