From 89d618b35f02f38a2b7d7a95637ebcf508e23161 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 2 Apr 2024 16:18:11 +0200 Subject: [PATCH] [WIP] `music.mopidy` refactor, initial backend rewrite. --- docs/source/backends.rst | 1 - .../source/platypush/backend/music.mopidy.rst | 6 - .../source/platypush/plugins/music.mopidy.rst | 5 + docs/source/plugins.rst | 1 + platypush/backend/music/mopidy/__init__.py | 324 ----- platypush/plugins/music/__init__.py | 15 +- platypush/plugins/music/mopidy/__init__.py | 1076 +++++++++++++++++ platypush/plugins/music/mopidy/_client.py | 486 ++++++++ platypush/plugins/music/mopidy/_common.py | 4 + platypush/plugins/music/mopidy/_conf.py | 23 + platypush/plugins/music/mopidy/_exc.py | 10 + platypush/plugins/music/mopidy/_playlist.py | 33 + platypush/plugins/music/mopidy/_status.py | 47 + platypush/plugins/music/mopidy/_sync.py | 50 + platypush/plugins/music/mopidy/_task.py | 58 + platypush/plugins/music/mopidy/_track.py | 43 + .../music/mopidy/manifest.yaml | 4 +- platypush/plugins/music/mpd/__init__.py | 19 +- platypush/schemas/mopidy.py | 325 +++++ platypush/utils/threads.py | 7 +- 20 files changed, 2198 insertions(+), 339 deletions(-) delete mode 100644 docs/source/platypush/backend/music.mopidy.rst create mode 100644 docs/source/platypush/plugins/music.mopidy.rst delete mode 100644 platypush/backend/music/mopidy/__init__.py create mode 100644 platypush/plugins/music/mopidy/__init__.py create mode 100644 platypush/plugins/music/mopidy/_client.py create mode 100644 platypush/plugins/music/mopidy/_common.py create mode 100644 platypush/plugins/music/mopidy/_conf.py create mode 100644 platypush/plugins/music/mopidy/_exc.py create mode 100644 platypush/plugins/music/mopidy/_playlist.py create mode 100644 platypush/plugins/music/mopidy/_status.py create mode 100644 platypush/plugins/music/mopidy/_sync.py create mode 100644 platypush/plugins/music/mopidy/_task.py create mode 100644 platypush/plugins/music/mopidy/_track.py rename platypush/{backend => plugins}/music/mopidy/manifest.yaml (92%) create mode 100644 platypush/schemas/mopidy.py diff --git a/docs/source/backends.rst b/docs/source/backends.rst index 4b76793a..26b632d1 100644 --- a/docs/source/backends.rst +++ b/docs/source/backends.rst @@ -8,7 +8,6 @@ Backends platypush/backend/http.rst platypush/backend/midi.rst - platypush/backend/music.mopidy.rst platypush/backend/music.spotify.rst platypush/backend/nodered.rst platypush/backend/redis.rst diff --git a/docs/source/platypush/backend/music.mopidy.rst b/docs/source/platypush/backend/music.mopidy.rst deleted file mode 100644 index 69815030..00000000 --- a/docs/source/platypush/backend/music.mopidy.rst +++ /dev/null @@ -1,6 +0,0 @@ -``music.mopidy`` -================================== - -.. automodule:: platypush.backend.music.mopidy - :members: - diff --git a/docs/source/platypush/plugins/music.mopidy.rst b/docs/source/platypush/plugins/music.mopidy.rst new file mode 100644 index 00000000..6d8deabf --- /dev/null +++ b/docs/source/platypush/plugins/music.mopidy.rst @@ -0,0 +1,5 @@ +``music.mopidy`` +================ + +.. automodule:: platypush.plugins.music.mopidy + :members: diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 1a63faaa..f016efcc 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -84,6 +84,7 @@ Plugins platypush/plugins/ml.cv.rst platypush/plugins/mobile.join.rst platypush/plugins/mqtt.rst + platypush/plugins/music.mopidy.rst platypush/plugins/music.mpd.rst platypush/plugins/music.snapcast.rst platypush/plugins/music.spotify.rst diff --git a/platypush/backend/music/mopidy/__init__.py b/platypush/backend/music/mopidy/__init__.py deleted file mode 100644 index 6914c931..00000000 --- a/platypush/backend/music/mopidy/__init__.py +++ /dev/null @@ -1,324 +0,0 @@ -import json -import re -import threading - -import websocket - -from platypush.backend import Backend -from platypush.message.event.music import ( - MusicPlayEvent, - MusicPauseEvent, - MusicStopEvent, - NewPlayingTrackEvent, - PlaylistChangeEvent, - VolumeChangeEvent, - PlaybackConsumeModeChangeEvent, - PlaybackSingleModeChangeEvent, - PlaybackRepeatModeChangeEvent, - PlaybackRandomModeChangeEvent, - MuteChangeEvent, - SeekChangeEvent, -) - - -# noinspection PyUnusedLocal -class MusicMopidyBackend(Backend): - """ - This backend listens for events on a Mopidy music server streaming port. - Since this backend leverages the Mopidy websocket interface it is only - compatible with Mopidy and not with other MPD servers. Please use the - :class:`platypush.backend.music.mpd.MusicMpdBackend` for a similar polling - solution if you're not running Mopidy or your instance has the websocket - interface or web port disabled. - - Requires: - - * A Mopidy instance running with the HTTP service enabled. - - """ - - def __init__(self, host='localhost', port=6680, **kwargs): - super().__init__(**kwargs) - - self.host = host - self.port = int(port) - self.url = 'ws://{}:{}/mopidy/ws'.format(host, port) - self._msg_id = 0 - self._ws = None - self._latest_status = {} - self._reconnect_thread = None - self._connected_event = threading.Event() - - try: - self._latest_status = self._get_tracklist_status() - except Exception as e: - self.logger.warning('Unable to get mopidy status: {}'.format(str(e))) - - @staticmethod - def _parse_track(track, pos=None): - if not track: - return {} - - conv_track = track.get('track', {}).copy() - conv_track['id'] = track.get('tlid') - conv_track['file'] = conv_track['uri'] - del conv_track['uri'] - - if 'artists' in conv_track: - conv_track['artist'] = conv_track['artists'][0].get('name') - del conv_track['artists'] - - if 'name' in conv_track: - conv_track['title'] = conv_track['name'] - del conv_track['name'] - - if 'album' in conv_track: - conv_track['album'] = conv_track['album']['name'] - - if 'length' in conv_track: - conv_track['time'] = ( - conv_track['length'] / 1000 - if conv_track['length'] - else conv_track['length'] - ) - del conv_track['length'] - - if pos is not None: - conv_track['pos'] = pos - - if '__model__' in conv_track: - del conv_track['__model__'] - - return conv_track - - def _communicate(self, msg): - if isinstance(msg, str): - msg = json.loads(msg) - - self._msg_id += 1 - msg['jsonrpc'] = '2.0' - msg['id'] = self._msg_id - msg = json.dumps(msg) - - ws = websocket.create_connection(self.url) - ws.send(msg) - response = json.loads(ws.recv()).get('result') - ws.close() - return response - - def _get_tracklist_status(self): - return { - 'repeat': self._communicate({'method': 'core.tracklist.get_repeat'}), - 'random': self._communicate({'method': 'core.tracklist.get_random'}), - 'single': self._communicate({'method': 'core.tracklist.get_single'}), - 'consume': self._communicate({'method': 'core.tracklist.get_consume'}), - } - - def _on_msg(self): - def hndl(*args): - msg = args[1] if len(args) > 1 else args[0] - msg = json.loads(msg) - event = msg.get('event') - if not event: - return - - status = {} - track = msg.get('tl_track', {}) - - if event == 'track_playback_paused': - status['state'] = 'pause' - track = self._parse_track(track) - if not track: - return - self.bus.post( - MusicPauseEvent(status=status, track=track, plugin_name='music.mpd') - ) - elif event == 'track_playback_resumed': - status['state'] = 'play' - track = self._parse_track(track) - if not track: - return - self.bus.post( - MusicPlayEvent(status=status, track=track, plugin_name='music.mpd') - ) - elif event == 'track_playback_ended' or ( - event == 'playback_state_changed' and msg.get('new_state') == 'stopped' - ): - status['state'] = 'stop' - track = self._parse_track(track) - self.bus.post( - MusicStopEvent(status=status, track=track, plugin_name='music.mpd') - ) - elif event == 'track_playback_started': - track = self._parse_track(track) - if not track: - return - - status['state'] = 'play' - status['position'] = 0.0 - status['time'] = track.get('time') - self.bus.post( - NewPlayingTrackEvent( - status=status, track=track, plugin_name='music.mpd' - ) - ) - elif event == 'stream_title_changed': - m = re.match(r'^\s*(.+?)\s+-\s+(.*)\s*$', msg.get('title', '')) - if not m: - return - - track['artist'] = m.group(1) - track['title'] = m.group(2) - status['state'] = 'play' - status['position'] = 0.0 - self.bus.post( - NewPlayingTrackEvent( - status=status, track=track, plugin_name='music.mpd' - ) - ) - elif event == 'volume_changed': - status['volume'] = msg.get('volume') - self.bus.post( - VolumeChangeEvent( - volume=status['volume'], - status=status, - track=track, - plugin_name='music.mpd', - ) - ) - elif event == 'mute_changed': - status['mute'] = msg.get('mute') - self.bus.post( - MuteChangeEvent( - mute=status['mute'], - status=status, - track=track, - plugin_name='music.mpd', - ) - ) - elif event == 'seeked': - status['position'] = msg.get('time_position') / 1000 - self.bus.post( - SeekChangeEvent( - position=status['position'], - status=status, - track=track, - plugin_name='music.mpd', - ) - ) - elif event == 'tracklist_changed': - tracklist = [ - self._parse_track(t, pos=i) - for i, t in enumerate( - self._communicate({'method': 'core.tracklist.get_tl_tracks'}) - ) - ] - - self.bus.post( - PlaylistChangeEvent(changes=tracklist, plugin_name='music.mpd') - ) - elif event == 'options_changed': - new_status = self._get_tracklist_status() - if new_status['random'] != self._latest_status.get('random'): - self.bus.post( - PlaybackRandomModeChangeEvent( - state=new_status['random'], plugin_name='music.mpd' - ) - ) - if new_status['repeat'] != self._latest_status['repeat']: - self.bus.post( - PlaybackRepeatModeChangeEvent( - state=new_status['repeat'], plugin_name='music.mpd' - ) - ) - if new_status['single'] != self._latest_status['single']: - self.bus.post( - PlaybackSingleModeChangeEvent( - state=new_status['single'], plugin_name='music.mpd' - ) - ) - if new_status['consume'] != self._latest_status['consume']: - self.bus.post( - PlaybackConsumeModeChangeEvent( - state=new_status['consume'], plugin_name='music.mpd' - ) - ) - - self._latest_status = new_status - - return hndl - - def _retry_connect(self): - def reconnect(): - while not self.should_stop() and not self._connected_event.is_set(): - try: - self._connect() - except Exception as e: - self.logger.warning('Error on websocket reconnection: %s', e) - - self._connected_event.wait(timeout=10) - - self._reconnect_thread = None - - if not self._reconnect_thread or not self._reconnect_thread.is_alive(): - self._reconnect_thread = threading.Thread(target=reconnect) - self._reconnect_thread.start() - - def _on_error(self): - def hndl(*args): - error = args[1] if len(args) > 1 else args[0] - ws = args[0] if len(args) > 1 else None - self.logger.warning('Mopidy websocket error: {}'.format(error)) - if ws: - ws.close() - - return hndl - - def _on_close(self): - def hndl(*_): - self._connected_event.clear() - self._ws = None - self.logger.warning('Mopidy websocket connection closed') - - if not self.should_stop(): - self._retry_connect() - - return hndl - - def _on_open(self): - def hndl(*_): - self._connected_event.set() - self.logger.info('Mopidy websocket connected') - - return hndl - - def _connect(self): - if not self._ws: - self._ws = websocket.WebSocketApp( - self.url, - on_open=self._on_open(), - on_message=self._on_msg(), - on_error=self._on_error(), - on_close=self._on_close(), - ) - - self._ws.run_forever() - - def run(self): - super().run() - self.logger.info( - 'Started tracking Mopidy events backend on {}:{}'.format( - self.host, self.port - ) - ) - self._connect() - - def on_stop(self): - self.logger.info('Received STOP event on the Mopidy backend') - if self._ws: - self._ws.close() - - self.logger.info('Mopidy backend terminated') - - -# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/music/__init__.py b/platypush/plugins/music/__init__.py index 6c3b04c7..70d43798 100644 --- a/platypush/plugins/music/__init__.py +++ b/platypush/plugins/music/__init__.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Optional +from typing import Dict, Iterable, Optional from platypush.plugins import Plugin, action @@ -107,5 +107,18 @@ class MusicPlugin(Plugin, ABC): def search(self, query, **kwargs): raise NotImplementedError() + @action + def get_images(self, resources: Iterable[str], **__) -> Dict[str, Optional[str]]: + """ + Get the images for a list of URIs. + + .. note:: This is an optional action, and it may not be implemented by all plugins. + If the plugin doesn't implement this action, it will return an empty dictionary. + + :param uris: List of URIs. + :return: Dictionary in the form ``{uri: image_url}``. + """ + return {} + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/music/mopidy/__init__.py b/platypush/plugins/music/mopidy/__init__.py new file mode 100644 index 00000000..e5c2bd85 --- /dev/null +++ b/platypush/plugins/music/mopidy/__init__.py @@ -0,0 +1,1076 @@ +from typing import Dict, Iterable, List, Optional, Union + +from platypush.plugins import RunnablePlugin, action +from platypush.plugins.media import PlayerState +from platypush.schemas.mopidy import ( + MopidyAlbumSchema, + MopidyArtistSchema, + MopidyDirectorySchema, + MopidyFilterSchema, + MopidyPlaylistSchema, + MopidyStatusSchema, + MopidyTrackSchema, +) +from platypush.utils import wait_for_either + +from ._client import MopidyClient +from ._common import DEFAULT_TIMEOUT +from ._conf import MopidyConfig +from ._exc import EmptyTrackException +from ._playlist import MopidyPlaylist +from ._status import MopidyStatus +from ._sync import PlaylistSync +from ._task import MopidyTask +from ._track import MopidyTrack + + +class MusicMopidyPlugin(RunnablePlugin): + """ + This plugin allows you to track the events from a Mopidy instance + and control it through the Mopidy HTTP API. + + Requires: + + * A Mopidy instance running with the HTTP service enabled. + + """ + + def __init__( + self, + host: str = 'localhost', + port: int = 6680, + ssl: bool = False, + timeout: Optional[float] = DEFAULT_TIMEOUT, + **kwargs, + ): + """ + :param host: Mopidy host (default: localhost). + :param port: Mopidy HTTP port (default: 6680). + :param ssl: Set to True if the Mopidy server is running on HTTPS. + :param timeout: Default timeout for the Mopidy requests (default: 20s). + """ + super().__init__(**kwargs) + + self.config = MopidyConfig(host=host, port=port, ssl=ssl, timeout=timeout) + self._status = MopidyStatus() + self._tasks: Dict[int, MopidyTask] = {} + self._client: Optional[MopidyClient] = None + self._playlist_sync = PlaylistSync() + + def _exec(self, *msgs: dict, **kwargs): + assert self._client, "Mopidy client not running" + return self._client.exec( + *msgs, timeout=kwargs.pop('timeout', self.config.timeout) + ) + + def _exec_with_status(self, *msgs: dict, **kwargs): + self._exec(*msgs, **kwargs) + return self._dump_status() + + def _dump_status(self): + assert self._client, "Mopidy client not running" + return MopidyStatusSchema().dump(self._client.status) + + def _dump_results(self, results: List[dict]) -> List[dict]: + schema_by_type = { + 'artist': MopidyArtistSchema(), + 'album': MopidyAlbumSchema(), + 'directory': MopidyDirectorySchema(), + 'playlist': MopidyPlaylistSchema(), + 'track': MopidyTrackSchema(), + } + + return [ + { + **( + MopidyTrack.parse(item).to_dict() # type: ignore + if item['type'] == 'track' + else schema_by_type[item['type']].dump(item) + ), + 'type': item['type'], + } + for item in results + ] + + def _dump_search_results(self, results: List[dict]) -> List[dict]: + return self._dump_results( + [ + { + **item, + 'type': res_type, + } + for search_provider in results + for res_type in ['artist', 'album', 'track'] + for item in search_provider.get(res_type + 's', []) + ] + ) + + def _lookup(self, *uris: str) -> Dict[str, List[MopidyTrack]]: + if not uris: + return {} + + if len(uris) > 1: + # If more than one URI is specified, we need to call only + # library.lookup, as playlist.lookup does not support multiple + # URIs. + result = self._exec( + {'method': 'core.library.lookup', 'uris': uris}, + )[0] + else: + # Otherwise, search both in the library and in the playlist + # controllers. + uri = uris[0] + result = self._exec( + {'method': 'core.playlists.lookup', 'uri': uri}, + {'method': 'core.library.lookup', 'uris': [uri]}, + ) + result = { + uri: ( + result[0].get('tracks', []) + if result[0] + else list(result[1].values())[0] + ) + } + + ret = {} + for uri, tracks in result.items(): + ret[uri] = [] + for track in tracks: + parsed_track = MopidyTrack.parse(track) + if parsed_track: + ret[uri].append(parsed_track) + + return ret + + def _add( + self, + *resources: str, + position: Optional[int] = None, + clear: bool = False, + lookup: bool = True, + ): + batch_size = 50 + results = self._lookup(*resources).values() + ret = [] + uris = ( + [track.uri for tracks in results for track in tracks if track and track.uri] + if lookup + else list(resources) + ) + + with self._playlist_sync: + if clear: + self.clear() + + for i in range(0, len(uris), batch_size): + ret += self._exec( + { + 'method': 'core.tracklist.add', + 'uris': uris[i : i + batch_size], + 'at_position': position, + } + )[0] + + self.logger.info('Loaded %d/%d tracks', len(ret), len(uris)) + + return ret + + def _get_playlist(self, playlist: str, with_tracks: bool = False) -> MopidyPlaylist: + playlists = self._get_playlists() + pl_by_name = {p.name: p for p in playlists} + pl_by_uri = {p.uri: p for p in playlists} + pl = pl_by_uri.get(playlist, pl_by_name.get(playlist)) + assert pl, f"Playlist {playlist} not found" + + if with_tracks: + pl.tracks = self._get_playlist_tracks(playlist) + + return pl + + def _get_playlist_tracks(self, playlist: str) -> List[MopidyTrack]: + playlists = self._get_playlists() + pl_by_name = {p.name: p for p in playlists} + pl_by_uri = {p.uri: p for p in playlists} + pl = pl_by_uri.get(playlist, pl_by_name.get(playlist)) + assert pl, f"Playlist {playlist} not found" + + tracks = self._exec({'method': 'core.playlists.get_items', 'uri': pl.uri})[0] + assert tracks is not None, f"Playlist {playlist} not found" + + ret = [] + for track in tracks: + parsed_track = MopidyTrack.parse(track) + if parsed_track: + ret.append(parsed_track) + + return ret + + def _get_playlists(self, **__) -> List[MopidyPlaylist]: + return [ + MopidyPlaylist.parse(pl) + for pl in self._exec({'method': 'core.playlists.as_list'})[0] + ] + + def _save_playlist(self, playlist: MopidyPlaylist): + return self._exec( + { + 'method': 'core.playlists.save', + 'playlist': { + '__model__': 'Playlist', + 'uri': playlist.uri, + 'name': playlist.name, + 'tracks': [ + { + '__model__': 'Track', + 'uri': track.uri, + } + for track in playlist.tracks + ], + }, + } + )[0] + + @action + def play( + self, + resource: Optional[str] = None, + position: Optional[int] = None, + track_id: Optional[int] = None, + **__, + ): + """ + Start playback, or play a resource by URI. + + :param resource: Resource path/URI. If not specified, it will resume the + playback if paused/stopped, otherwise it will start playing the + selected track. + :param track_id: The ID of track (or ``tlid``) in the current playlist + that should be played, if you want to play a specific track already + loaded in the current playlist. + :param position: Position number (0-based) of the track in the current + playlist that should be played. + :return: .. schema:: mopidy.MopidyStatusSchema + """ + if resource: + ret = self._add(resource, position=0) + if not ret: + self.logger.warning('Failed to add %s to the tracklist', resource) + elif isinstance(ret, dict): + track_id = ret.get('tlid') + elif position is not None: + tracklist = self._exec({'method': 'core.tracklist.get_tl_tracks'})[0] + if position < 0 or position >= len(tracklist): + self.logger.warning( + 'Position %d is out of bounds for the current tracklist', position + ) + return None + + track_id = tracklist[position]['tlid'] + + return self._exec_with_status( + {'method': 'core.playback.play', 'tlid': track_id} + ) + + @action + def play_pos(self, pos: int): + """ + Play a track in the current playlist by position number. + + Legacy alias for :meth:`.play` with a ``position`` parameter. + + :param pos: Position number (0-based). + """ + return self.play(position=pos) + + @action + def load(self, playlist: str, play: bool = True): + """ + Load/play a playlist. + + This method will clear the current playlist and load the tracks from the + given playlist. + + You should usually prefer :meth:`.add` to this method, as it is more + general-purpose (``load`` only works with playlists). This method exists + mainly for compatibility with the MPD plugin. + + :param playlist: Playlist URI. + :param play: Start playback after loading the playlist (default: True). + """ + self._add(playlist, clear=True) + if play: + self.play() + + @action + def lookup(self, resources: Iterable[str], **__): + """ + Lookup (one or) resources by URI. + + Given a list of URIs, this method will return a dictionary in the form + ``{uri: [track1, track2, ...]}``. + + :param resource: Resource URI(s). + :return: .. schema:: mopidy.MopidyTrackSchema(many=True) + """ + return { + uri: [track.to_dict() for track in tracks] + for uri, tracks in self._lookup(*resources).items() + } + + @action + def pause(self, **__): + """ + Pause the playback. + + :return: .. schema:: mopidy.MopidyStatusSchema + """ + return self._exec_with_status({'method': 'core.playback.pause'}) + + @action + def stop(self, **__): # type: ignore + """ + Stop the playback. + + :return: .. schema:: mopidy.MopidyStatusSchema + """ + return self._exec_with_status({'method': 'core.playback.stop'}) + + @action + def prev(self, **__): + """ + Play the previous track. + + :return: .. schema:: mopidy.MopidyStatusSchema + """ + return self._exec_with_status({'method': 'core.playback.previous'}) + + @action + def next(self, **__): + """ + Play the next track. + + :return: .. schema:: mopidy.MopidyStatusSchema + """ + return self._exec_with_status({'method': 'core.playback.next'}) + + @action + def add( + self, + resource: Union[str, Iterable[str]], + *_, + position: Optional[int] = None, + **__, + ): + """ + Add a resource (track, album, artist, folder etc.) to the current + playlist. + + :param resource: Resource URI(s). + :param position: Position (0-based) where the track(s) will be inserted + (default: end of the playlist). + :return: The list of tracks added to the queue. + .. schema:: mopidy.MopidyTrackSchema(many=True) + """ + resources = [resource] if isinstance(resource, str) else resource + tracks = [ + MopidyTrack.parse(track) + for track in self._add(*resources, position=position) + ] + return [track.to_dict() for track in tracks if track] + + @action + def pause_if_playing(self, **__): + """ + Pause the playback if it's currently playing. + + :return: .. schema:: mopidy.MopidyStatusSchema + """ + if self._status.state == PlayerState.PLAY: + return self.pause() + + return self._dump_status() + + @action + def play_if_paused(self, **__): + """ + Resume the playback if it's currently paused. + + :return: .. schema:: mopidy.MopidyStatusSchema + """ + if self._status.state == PlayerState.PAUSE: + return self.play() + + return self._dump_status() + + @action + def play_if_paused_or_stopped(self): + """ + Resume the playback if it's currently paused or stopped. + + :return: .. schema:: mopidy.MopidyStatusSchema + """ + if self._status.state in {PlayerState.PAUSE, PlayerState.STOP}: + return self.play() + + return self._dump_status() + + @action + def play_or_stop(self): + """ + Play if the playback is stopped, stop if it's playing, otherwise resume + playback. + + :return: .. schema:: mopidy.MopidyStatusSchema + """ + if self._status.state == PlayerState.PLAY: + return self.stop() + + return self.play() + + @action + def set_volume(self, volume: int, **__): + """ + Set the volume. + + :param volume: Volume level (0-100). + :return: .. schema:: mopidy.MopidyStatusSchema + """ + return self._exec_with_status( + {'method': 'core.mixer.set_volume', 'volume': volume} + ) + + @action + def volup(self, step: int = 5, **__): + """ + Increase the volume by a given step. + + :param step: Volume step (default: 5%). + :return: .. schema:: mopidy.MopidyStatusSchema + """ + return self.set_volume(volume=min(100, self._status.volume + step)) + + @action + def voldown(self, step: int = 5, **__): + """ + Decrease the volume by a given step. + + :param step: Volume step (default: 5%). + :return: .. schema:: mopidy.MopidyStatusSchema + """ + return self.set_volume(volume=max(0, self._status.volume - step)) + + @action + def random(self, value: Optional[bool] = None, **__): + """ + Set the random mode. + + :param value: Random mode. If not specified, it will toggle the current + random mode. + :return: .. schema:: mopidy.MopidyStatusSchema + """ + if value is None: + value = not self._status.random + + return self._exec_with_status( + {'method': 'core.tracklist.set_random', 'value': bool(value)} + ) + + @action + def repeat(self, value: Optional[bool] = None, **__): + """ + Set the repeat mode. + + :param value: Repeat mode. If not specified, it will toggle the current + repeat mode. + :return: .. schema:: mopidy.MopidyStatusSchema + """ + if value is None: + value = not self._status.repeat + + return self._exec_with_status( + {'method': 'core.tracklist.set_repeat', 'value': bool(value)} + ) + + @action + def consume(self, value: Optional[bool] = None, **__): + """ + Set the consume mode. + + :param value: Consume mode. If not specified, it will toggle the current + consume mode. + :return: .. schema:: mopidy.MopidyStatusSchema + """ + if value is None: + value = not self._status.consume + + return self._exec_with_status( + {'method': 'core.tracklist.set_consume', 'value': bool(value)} + ) + + @action + def single(self, value: Optional[bool] = None, **__): + """ + Set the single mode. + + :param value: Single mode. If not specified, it will toggle the current + single mode. + :return: .. schema:: mopidy.MopidyStatusSchema + """ + if value is None: + value = not self._status.single + + return self._exec_with_status( + {'method': 'core.tracklist.set_single', 'value': bool(value)} + ) + + @action + def shuffle(self, **__): + """ + Shuffle the current playlist. + + :return: .. schema:: mopidy.MopidyStatusSchema + """ + return self._exec_with_status({'method': 'core.tracklist.shuffle'}) + + @action + def save(self, name: str, **__): + """ + Save the current tracklist to a new playlist with the given name. + + :param name: New playlist name. + """ + return self._exec({'method': 'core.playlists.save', 'name': name})[0] + + @action + def delete( + self, + positions: Optional[Iterable[int]] = None, + uris: Optional[Iterable[str]] = None, + ): + """ + Delete tracks from the current tracklist. + + .. note:: At least one of the ``positions`` or ``uris`` parameters must + be specified. + + :param positions: (0-based) positions of the tracks to be deleted. + :param uris: URIs of the tracks to be deleted. + """ + assert ( + positions or uris + ), "At least one of 'positions' or 'uris' must be specified" + criteria = {} + if positions: + assert self._client, "Mopidy client not running" + positions = set(positions) + criteria['tlid'] = list( + { + track.track_id + for i, track in enumerate(self._client.tracks) + if i in positions + } + ) + if uris: + criteria['uri'] = list(uris) + + return self._exec( + { + 'method': 'core.tracklist.remove', + 'criteria': criteria, + } + )[0] + + @action + def move( + self, + start: Optional[int] = None, + end: Optional[int] = None, + position: Optional[int] = None, + from_pos: Optional[int] = None, + to_pos: Optional[int] = None, + **__, + ): + """ + Move one or more tracks in the current playlist to a new position. + + You can pass either: + + - ``start``, ``end`` and ``position`` to move a slice of tracks + from ``start`` to ``end`` to the new position ``position``. + - ``from_pos`` and ``to_pos`` to move a single track from + ``from_pos`` to ``to_pos``. + + .. note: Positions are 0-based (i.e. the first track has position 0). + + :param start: Start position of the slice of tracks to be moved. + :param end: End position of the slice of tracks to be moved. + :param position: New position where the tracks will be inserted. + :param from_pos: Alias for ``start`` - it only works with one track at + the time. Maintained for compatibility with + :meth:`platypush.plugins.music.mpd.MusicMpdPlugin.move`. + :param to_pos: Alias for ``position`` - it only works with one track at + the time. Maintained for compatibility with + :meth:`platypush.plugins.music.mpd.MusicMpdPlugin.move`. + """ + assert (from_pos is not None and to_pos is not None) or ( + start is not None and end is not None and position is not None + ), 'Either "start", "end" and "position", or "from_pos" and "to_pos" must be specified' + + if (from_pos is not None) and (to_pos is not None): + start, end, position = from_pos, from_pos, to_pos + + ret = self._exec( + { + 'method': 'core.tracklist.move', + 'start': start, + 'end': end, + 'to_position': position, + } + )[0] + + if self._client: + self._client.refresh_status(with_tracks=True) + return ret + + @action + def clear(self, **__): + """ + Clear the current playlist. + """ + self._exec_with_status({'method': 'core.tracklist.clear'}) + + @action + def seek(self, position: float, **__): + """ + Seek to a given position in the current track. + + :param position: Position in seconds. + :return: .. schema:: mopidy.MopidyStatusSchema + """ + return self._exec_with_status( + {'method': 'core.playback.seek', 'time_position': int(position * 1000)} + ) + + @action + def back(self, delta: float = 10, **__): + """ + Seek back by a given number of seconds. + + :param delta: Number of seconds to seek back (default: 10s). + :return: .. schema:: mopidy.MopidyStatusSchema + """ + if self._status.playing_pos is None: + return self._dump_status() + + return self.seek(position=self._status.playing_pos - delta) + + @action + def forward(self, delta: float = 10, **__): + """ + Seek forward by a given number of seconds. + + :param delta: Number of seconds to seek forward (default: 10s). + :return: .. schema:: mopidy.MopidyStatusSchema + """ + if self._status.playing_pos is None: + return self._dump_status() + + return self.seek(position=self._status.playing_pos + delta) + + @action + def status(self, **__): + """ + Get the current Mopidy status. + + :return: .. schema:: mopidy.MopidyStatusSchema + """ + assert self._client, "Mopidy client not running" + self._client.refresh_status() + return self._dump_status() + + @action + def current_track(self, **__): + """ + Get the current track. + + :return: .. schema:: mopidy.MopidyTrackSchema + """ + assert self._client, "Mopidy client not running" + if not self._client.status.track: + return None + + return self._client.status.track.to_dict() + + @action + def get_tracks(self, **__): + """ + Get the current playlist tracks. + + :return: .. schema:: mopidy.MopidyTrackSchema(many=True) + """ + assert self._client, "Mopidy client not running" + return [t.to_dict() for t in self._client.tracks] + + @action + def get_playlists(self, **__): + """ + Get the available playlists. + + :return: .. schema:: mopidy.MopidyPlaylistSchema(many=True) + """ + return MopidyPlaylistSchema().dump(self._get_playlists(), many=True) + + @action + def get_playlist(self, playlist: str, **__): + """ + Get the items in a playlist. + + :param playlist: Playlist URI. + :param only_tracks: If True, only the tracks will be returned, otherwise + the full playlist object will be returned - including name and other + metadata. + :return: .. schema:: mopidy.MopidyTrackSchema(many=True) + """ + tracks = self._get_playlist_tracks(playlist) + tracks_by_uri = {t.uri: t for t in tracks if t.uri} + looked_up = self._lookup(*tracks_by_uri.keys()) + return [ + track.to_dict() + for track in [ + (looked_up[uri][0] if looked_up.get(uri) else track) + for uri, track in tracks_by_uri.items() + ] + ] + + @action + def get_playlist_uri_schemes(self, **__): + """ + Get the available playlist URI schemes. + + :return: List of available playlist URI schemes. + """ + return self._exec({'method': 'core.playlists.get_uri_schemes'})[0] + + @action + def create_playlist(self, name: str, uri_scheme: str = 'm3u', **__): + """ + Create a new playlist. + + :param name: Playlist name. + :param uri_scheme: URI scheme for the playlist (default: ``m3u``). + You can get a full list of the available URI schemes that support + playlist creation on the Mopidy instance by calling + :meth:`.get_playlist_uri_schemes`. + :return: .. schema:: mopidy.MopidyPlaylistSchema + """ + return MopidyPlaylistSchema().dump( + self._exec( + { + 'method': 'core.playlists.create', + 'name': name, + 'uri_scheme': uri_scheme, + } + )[0] + ) + + @action + def delete_playlist(self, playlist: str, **__): + """ + Delete a playlist. + + :param playlist: Playlist URI. + :return: ``True`` if the playlist was deleted, ``False`` otherwise. + """ + return self._exec({'method': 'core.playlists.delete', 'uri': playlist})[0] + + @action + def add_to_playlist( + self, + playlist: str, + resources: Iterable[str], + position: Optional[int] = None, + allow_duplicates: bool = False, + **__, + ): + """ + Add tracks to a playlist. + + :param playlist: Playlist URI/name. + :param resources: List of track URIs. + :param position: Position where the tracks will be inserted (default: + end of the playlist). + :param allow_duplicates: If True, the tracks will be added even if they + are already present in the playlist (default: False). + :return: The modified playlist. + .. schema:: mopidy.MopidyPlaylistSchema + """ + pl = self._get_playlist(playlist, with_tracks=True) + + if not allow_duplicates: + existing_uris = {t.uri for t in pl.tracks} + resources = [t for t in resources if t not in existing_uris] + + new_tracks = [MopidyTrack(uri=t) for t in resources] + if position is not None: + pl.tracks = pl.tracks[:position] + new_tracks + pl.tracks[position:] + else: + pl.tracks += new_tracks + + self._save_playlist(pl) + return pl.to_dict() + + @action + def remove_from_playlist( + self, + playlist: str, + resources: Optional[Iterable[Union[str, int]]] = None, + from_pos: Optional[int] = None, + to_pos: Optional[int] = None, + **__, + ): + """ + Remove tracks from a playlist. + + This action can work in three different ways: + + - If the ``resources`` parameter is specified, and it contains + strings, it will remove the tracks matching the provided URIs. + - If the ``resources`` parameter is specified, and it contains + integers, it will remove the tracks in the specified positions. + - If the ``from_pos`` and ``to_pos`` parameters are specified, it + will remove the tracks in the specified range (inclusive). + + .. note: Positions are 0-based (i.e. the first track has position 0). + + :param playlist: Playlist URI/name. + :param tracks: List of track URIs. + :param from_pos: Start position of the slice of tracks to be removed. + :param to_pos: End position of the slice of tracks to be removed. + :return: The modified playlist. + .. schema:: mopidy.MopidyPlaylistSchema + """ + assert resources or ( + from_pos is not None and to_pos is not None + ), "Either 'tracks', or 'positions', or 'from_pos' and 'to_pos' must be specified" + + pl = self._get_playlist(playlist, with_tracks=True) + + if resources: + resources = set(resources) + positions = { + i + for i, t in enumerate(pl.tracks) + if t.uri in resources or i in resources + } + + pl.tracks = [t for i, t in enumerate(pl.tracks) if i not in positions] + elif from_pos is not None and to_pos is not None: + from_pos, to_pos = (min(from_pos, to_pos), max(from_pos, to_pos)) + pl.tracks = pl.tracks[: from_pos - 1] + pl.tracks[to_pos:] + + self._save_playlist(pl) + return pl.to_dict() + + @action + def playlist_move( + self, + playlist: str, + start: Optional[int] = None, + end: Optional[int] = None, + position: Optional[int] = None, + from_pos: Optional[int] = None, + to_pos: Optional[int] = None, + **__, + ): + """ + Move tracks in a playlist. + + This action can work in two different ways: + + - If the ``start``, ``end`` and ``position`` parameters are + specified, it will move an individual track from the position + ``start`` to the position ``end`` to the new position + ``position``. + + - If the ``from_pos``, ``to_pos`` and ``position`` parameters are + specified, it will move the tracks in the specified range + (inclusive) to the new position ``position``. + + .. note: Positions are 0-based (i.e. the first track has position 0). + + :param playlist: Playlist URI. + :param start: Start position of the slice of tracks to be moved. + :param end: End position of the slice of tracks to be moved. + :param position: New position where the tracks will be inserted. + :return: The modified playlist. + .. schema:: mopidy.MopidyPlaylistSchema + """ + assert (start is not None and end is not None and position is not None) or ( + from_pos is not None and to_pos is not None + ), "Either 'start', 'end' and 'position', or 'from_pos' and 'to_pos' must be specified" + + pl = self._get_playlist(playlist, with_tracks=True) + + if from_pos is not None and to_pos is not None: + from_pos, to_pos = (min(from_pos, to_pos), max(from_pos, to_pos)) + pl.tracks = ( + pl.tracks[:from_pos] + + pl.tracks[to_pos : to_pos + 1] + + pl.tracks[from_pos + 1 : to_pos] + + pl.tracks[from_pos : from_pos + 1] + + pl.tracks[to_pos + 1 :] + ) + elif start is not None and end is not None and position is not None: + start, end = (min(start, end), max(start, end)) + if start == end: + end += 1 + + if start < position: + pl.tracks = ( + pl.tracks[:start] + + pl.tracks[end : end + (position - start)] + + pl.tracks[start:end] + + pl.tracks[end + (position - start) :] + ) + else: + pl.tracks = ( + pl.tracks[:position] + + pl.tracks[start:end] + + pl.tracks[position:start] + + pl.tracks[end:] + ) + + self._save_playlist(pl) + return pl.to_dict() + + @action + def playlist_clear(self, playlist: str, **__): + """ + Remove all the tracks from a playlist. + + :param playlist: Playlist URI/name. + :return: The modified playlist. + .. schema:: mopidy.MopidyPlaylistSchema + """ + pl = self._get_playlist(playlist) + pl.tracks = [] + self._save_playlist(pl) + return pl.to_dict() + + @action + def rename_playlist(self, playlist: str, new_name: str, **__): + """ + Rename a playlist. + + :param playlist: Playlist URI/name. + :param new_name: New playlist name. + :return: The modified playlist. + .. schema:: mopidy.MopidyPlaylistSchema + """ + pl = self._get_playlist(playlist, with_tracks=True) + pl.name = new_name + self._save_playlist(pl) + return pl.to_dict() + + @action + def get_images(self, resources: Iterable[str], **__) -> Dict[str, Optional[str]]: + """ + Get the images for a list of URIs. + + :param resources: List of URIs. + :return: Dictionary in the form ``{uri: image_url}``. + """ + return { + uri: next(iter(images or []), {}).get('uri') + for uri, images in self._exec( + {'method': 'core.library.get_images', 'uris': list(resources)} + )[0].items() + } + + @action + def search( # pylint: disable=redefined-builtin + self, filter: dict, exact: bool = False, **__ + ): + """ + Search items that match the given query. + + :param filter: .. schema:: mopidy.MopidyFilterSchema + :param exact: If True, the search will only return exact matches. + :return: A list of result, including: + + - Tracks + .. schema:: mopidy.MopidyTrackSchema(many=True) + - Albums + .. schema:: mopidy.MopidyAlbumSchema(many=True) + - Artists + .. schema:: mopidy.MopidyArtistSchema(many=True) + + """ + filter = dict(MopidyFilterSchema().load(filter) or {}) + uris = filter.pop('uris', None) + kwargs = { + 'exact': exact, + 'query': filter, + **({'uris': uris} if uris else {}), + } + + return self._dump_search_results( + self._exec({'method': 'core.library.search', **kwargs})[0] + ) + + @action + def find( # pylint: disable=redefined-builtin + self, filter: dict, exact: bool = False, **__ + ): + """ + Alias for :meth:`search`, for MPD compatibility. + + :param filter: .. schema:: mopidy.MopidyFilterSchema + :param exact: If True, the search will only return exact matches. + :return: .. schema:: mopidy.MopidyTrackSchema(many=True) + """ + return self.search(filter=filter, exact=exact) + + @action + def browse(self, uri: Optional[str] = None): + """ + Browse the items under the specified URI. + + :param uri: URI to browse (default: root directory). + :return: A list of result under the specified resource, including: + + - Directories + .. schema:: mopidy.MopidyDirectorySchema(many=True) + - Tracks + .. schema:: mopidy.MopidyTrackSchema(many=True) + - Albums + .. schema:: mopidy.MopidyAlbumSchema(many=True) + - Artists + .. schema:: mopidy.MopidyArtistSchema(many=True) + + """ + return self._dump_results( + self._exec({'method': 'core.library.browse', 'uri': uri})[0] + ) + + def main(self): + while not self.should_stop(): + try: + with MopidyClient( + config=self.config, + status=self._status, + stop_event=self._should_stop, + playlist_sync=self._playlist_sync, + tasks=self._tasks, + ) as self._client: + self._client.start() + wait_for_either(self._should_stop, self._client.closed_event) + finally: + self._client = None + self.wait_stop(10) + + +__all__ = ['EmptyTrackException', 'MusicMopidyPlugin', 'MopidyStatus', 'MopidyTrack'] + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/music/mopidy/_client.py b/platypush/plugins/music/mopidy/_client.py new file mode 100644 index 00000000..d27129ed --- /dev/null +++ b/platypush/plugins/music/mopidy/_client.py @@ -0,0 +1,486 @@ +import json +import logging +import re +import time +from dataclasses import asdict +from queue import Empty +from threading import Event, RLock, Thread +from typing import Dict, Generator, List, Optional, Type + +import websocket + +from platypush.context import get_bus +from platypush.message.event.music import ( + MusicEvent, + MusicPauseEvent, + MusicPlayEvent, + MusicStopEvent, + MuteChangeEvent, + NewPlayingTrackEvent, + PlaybackConsumeModeChangeEvent, + PlaybackRandomModeChangeEvent, + PlaybackRepeatModeChangeEvent, + PlaybackSingleModeChangeEvent, + PlaylistChangeEvent, + SeekChangeEvent, + VolumeChangeEvent, +) +from platypush.plugins.media import PlayerState + +from ._common import DEFAULT_TIMEOUT +from ._conf import MopidyConfig +from ._status import MopidyStatus +from ._sync import PlaylistSync +from ._task import MopidyTask +from ._track import MopidyTrack + + +class MopidyClient(Thread): + """ + Thread that listens for Mopidy events and posts them to the bus. + """ + + def __init__( + self, + config: MopidyConfig, + status: MopidyStatus, + stop_event: Event, + playlist_sync: PlaylistSync, + tasks: Dict[int, MopidyTask], + **_, + ): + super().__init__(name='platypush:mopidy:listener') + + self.logger = logging.getLogger('platypush:mopidy:listener') + self.config = config + self._status = status + self._stop_event = stop_event + self._playlist_sync = playlist_sync + self._tasks = tasks + self._refresh_in_progress = Event() + self._refresh_lock = RLock() + self._req_lock = RLock() + self._close_lock = RLock() + self._tracks: List[MopidyTrack] = [] + self._msg_id = 0 + self._ws = None + self.connected_event = Event() + self.closed_event = Event() + + @property + def _bus(self): + return get_bus() + + @property + def status(self): + return self._status + + @property + def tracks(self): + return self._tracks + + def should_stop(self): + return self._stop_event.is_set() + + def wait_stop(self, timeout: Optional[float] = None): + self._stop_event.wait(timeout=timeout) + + def make_task(self, method: str, **args: dict) -> MopidyTask: + with self._req_lock: + self._msg_id += 1 + task = MopidyTask( + id=self._msg_id, + method=method, + args=args or {}, + ) + + self._tasks[task.id] = task + return task + + def send(self, *tasks: MopidyTask): + """ + Send a list of tasks to the Mopidy server. + """ + assert self._ws, 'Websocket not connected' + + for task in tasks: + with self._req_lock: + task.send(self._ws) + + def gather( + self, + *tasks: MopidyTask, + timeout: Optional[float] = DEFAULT_TIMEOUT, + ) -> Generator: + t_start = time.time() + + for task in tasks: + remaining_timeout = ( + max(0, timeout - (time.time() - t_start)) if timeout else None + ) + + if not self._tasks.get(task.id): + yield None + + try: + ret = self._tasks[task.id].get_response(timeout=remaining_timeout) + assert not isinstance(ret, Exception), ret + self.logger.debug('Got response for %s: %s', task, ret) + yield ret + except Empty as e: + t = self._tasks.get(task.id) + err = 'Mopidy request timeout' + if t: + err += f' - method: {t.method} args: {t.args}' + + raise TimeoutError(err) from e + finally: + self._tasks.pop(task.id, None) + + def exec(self, *msgs: dict, timeout: Optional[float] = DEFAULT_TIMEOUT) -> list: + tasks = [self.make_task(**msg) for msg in msgs] + for task in tasks: + self.send(task) + + return list(self.gather(*tasks, timeout=timeout)) + + def refresh_status( # pylint: disable=too-many-branches + self, timeout: Optional[float] = DEFAULT_TIMEOUT, with_tracks: bool = False + ): + if self._refresh_in_progress.is_set(): + return + + events = [] + + try: + with self._refresh_lock: + self._refresh_in_progress.set() + # Refresh the tracklist attributes + opts = ('repeat', 'random', 'single', 'consume') + ret = self.exec( + *[ + *[{'method': f'core.tracklist.get_{opt}'} for opt in opts], + {'method': 'core.playback.get_current_tl_track'}, + {'method': 'core.playback.get_state'}, + {'method': 'core.mixer.get_volume'}, + {'method': 'core.playback.get_time_position'}, + *( + [{'method': 'core.tracklist.get_tl_tracks'}] + if with_tracks + else [] + ), + ], + timeout=timeout, + ) + + for i, opt in enumerate(opts): + new_value = ret[i] + if opt == 'random' and self._status.random != new_value: + events.append( + (PlaybackRandomModeChangeEvent, {'state': new_value}) + ) + if opt == 'repeat' and self._status.repeat != new_value: + events.append( + (PlaybackRepeatModeChangeEvent, {'state': new_value}) + ) + if opt == 'single' and self._status.single != new_value: + events.append( + (PlaybackSingleModeChangeEvent, {'state': new_value}) + ) + if opt == 'consume' and self._status.consume != new_value: + events.append( + (PlaybackConsumeModeChangeEvent, {'state': new_value}) + ) + + setattr(self._status, opt, new_value) + + # Get remaining info + track = MopidyTrack.parse(ret[4]) + state, volume, t = ret[5:8] + + if track: + idx = self.exec( + { + 'method': 'core.tracklist.index', + 'tlid': track.track_id, + }, + timeout=timeout, + )[0] + + self._status.track = track + self._status.duration = track.time + if idx is not None: + self._status.playing_pos = self._status.track.playlist_pos = idx + + if track != self._status.track and state != 'stopped': + events.append((NewPlayingTrackEvent, {})) + + if state != self._status.state: + if state == 'paused': + self._status.state = PlayerState.PAUSE + events.append((MusicPauseEvent, {})) + elif state == 'playing': + self._status.state = PlayerState.PLAY + events.append((MusicPlayEvent, {})) + elif state == 'stopped': + self._status.state = PlayerState.STOP + events.append((MusicStopEvent, {})) + + if volume != self._status.volume: + self._status.volume = volume + events.append((VolumeChangeEvent, {'volume': volume})) + + if t != self._status.time: + self._status.time = t / 1000 + events.append((SeekChangeEvent, {'position': self._status.time})) + + if with_tracks: + self._tracks = [ # type: ignore + MopidyTrack.parse({**t, 'playlist_pos': i}) + for i, t in enumerate(ret[8]) + ] + + for evt in events: + self._post_event(evt[0], **evt[1]) + finally: + self._refresh_in_progress.clear() + + def _refresh_status( + self, timeout: Optional[float] = DEFAULT_TIMEOUT, with_tracks: bool = False + ): + """ + Refresh the status from the Mopidy server. + + It runs in a separate thread because the status refresh logic runs in + synchronous mode, and it would block the main thread preventing the + listener from receiving new messages. + + Also, an event+reenrant lock mechanism is used to ensure that only one + refresh task is running at a time. + """ + if self._refresh_in_progress.is_set(): + return + + with self._refresh_lock: + Thread( + target=self.refresh_status, + kwargs={'timeout': timeout, 'with_tracks': with_tracks}, + daemon=True, + ).start() + + def _post_event(self, evt_cls: Type[MusicEvent], **kwargs): + self._bus.post( + evt_cls( + status=asdict(self._status), + track=asdict(self._status.track) if self._status.track else None, + plugin_name='music.mopidy', + **kwargs, + ) + ) + + def _handle_error(self, msg: dict): + msg_id = msg.get('id') + err = msg.get('error') + if not err: + return + + err_data = err.get('data', {}) + tb = err_data.get('traceback') + self.logger.warning( + 'Mopidy error: %s: %s: %s', + err.get('message'), + err_data.get('type'), + err_data.get('message'), + ) + if tb: + self.logger.warning(tb) + + if msg_id: + task = self._tasks.get(msg_id) + if task: + task.put_response( + RuntimeError(err.get('message') + ': ' + err_data.get('message')) + ) + + def on_pause(self, *_, **__): + self._status.state = PlayerState.PAUSE + self._post_event(MusicPauseEvent) + + def on_resume(self, *_, **__): + self._status.state = PlayerState.PLAY + self._post_event(MusicPlayEvent) + + def on_start(self, *_, **__): + self._refresh_status() + + def on_end(self, *_, **__): + self._refresh_status() + + def on_state_change(self, msg: dict, *_, **__): + state = msg.get('new_state') + if state == PlayerState.PLAY: + self._status.state = PlayerState.PLAY + self._post_event(MusicPlayEvent) + elif state == PlayerState.PAUSE: + self._status.state = PlayerState.PAUSE + self._post_event(MusicPauseEvent) + elif state == PlayerState.STOP: + self._status.state = PlayerState.STOP + self._post_event(MusicStopEvent) + + def on_title_change(self, msg: dict, *_, track: MopidyTrack, **__): + title = msg.get('title', '') + m = re.match(r'^\s*(.+?)\s+-\s+(.*)\s*$', title) + if not m: + return + + track.artist = m.group(1) + track.title = m.group(2) + self._post_event(NewPlayingTrackEvent) + + def on_volume_change(self, msg: dict, *_, **__): + volume = msg.get('volume') + if volume is None: + return + + self._status.volume = volume + self._post_event(VolumeChangeEvent, volume=volume) + + def on_mute_change(self, msg: dict, *_, **__): + mute = msg.get('mute') + if mute is None: + return + + self._status.mute = mute + self._post_event(MuteChangeEvent, mute=mute) + + def on_seek(self, msg: dict, *_, **__): + position = msg.get('time_position') + if position is None: + return + + self._status.time = position / 1000 + self._post_event(SeekChangeEvent, position=self._status.time) + + def on_tracklist_change(self, *_, **__): + should_proceed = self._playlist_sync.wait_for_loading(timeout=2) + if not should_proceed: + return + + self.logger.debug('Tracklist changed, refreshing changes') + self._refresh_status(with_tracks=True) + self._post_event(PlaylistChangeEvent) + + def on_options_change(self, *_, **__): + self._refresh_status() + + def _on_msg(self, *args): + msg = args[1] if len(args) > 1 else args[0] + msg = json.loads(msg) + msg_id = msg.get('id') + event = msg.get('event') + track: Optional[MopidyTrack] = None + self.logger.debug('Received Mopidy message: %s', msg) + + if msg.get('error'): + self._handle_error(msg) + return + + if msg_id: + task = self._tasks.get(msg_id) + if task: + task.put_response(msg) + return + + if not event: + return + + if msg.get('tl_track'): + track = self._status.track = MopidyTrack.parse(msg['tl_track']) + + hndl = self._msg_handlers.get(event) + if not hndl: + return + + hndl(self, msg, track=track) + + def _on_error(self, *args): + error = args[1] if len(args) > 1 else args[0] + ws = args[0] if len(args) > 1 else None + self.logger.warning('Mopidy websocket error: %s', error) + if ws: + ws.close() + + def _on_close(self, *_): + self.connected_event.clear() + self.closed_event.set() + + if self._ws: + try: + self._ws.close() + except Exception as e: + self.logger.debug(e, exc_info=True) + finally: + self._ws = None + + self.logger.warning('Mopidy websocket connection closed') + + def _on_open(self, *_): + self.connected_event.set() + self.closed_event.clear() + self.logger.info('Mopidy websocket connected') + self._refresh_status(with_tracks=True) + + def _connect(self): + if not self._ws: + self._ws = websocket.WebSocketApp( + self.config.url, + on_open=self._on_open, + on_message=self._on_msg, + on_error=self._on_error, + on_close=self._on_close, + ) + + self._ws.run_forever() + + def run(self): + while not self.should_stop(): + try: + self._connect() + except Exception as e: + self.logger.warning( + 'Error on websocket connection: %s', e, exc_info=True + ) + finally: + self.connected_event.clear() + self.closed_event.set() + self.wait_stop(10) + + def stop(self): + with self._close_lock: + if self._ws: + self._ws.close() + self._ws = None + + def __enter__(self): + return self + + def __exit__(self, *_): + self.stop() + + _msg_handlers = { + 'track_playback_paused': on_pause, + 'playback_state_changed': on_state_change, + 'track_playback_resumed': on_resume, + 'track_playback_ended': on_end, + 'track_playback_started': on_start, + 'stream_title_changed': on_title_change, + 'volume_changed': on_volume_change, + 'mute_changed': on_mute_change, + 'seeked': on_seek, + 'tracklist_changed': on_tracklist_change, + 'options_changed': on_options_change, + } + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/music/mopidy/_common.py b/platypush/plugins/music/mopidy/_common.py new file mode 100644 index 00000000..1be03661 --- /dev/null +++ b/platypush/plugins/music/mopidy/_common.py @@ -0,0 +1,4 @@ +from typing import Final + + +DEFAULT_TIMEOUT: Final[float] = 20 diff --git a/platypush/plugins/music/mopidy/_conf.py b/platypush/plugins/music/mopidy/_conf.py new file mode 100644 index 00000000..cd745ce2 --- /dev/null +++ b/platypush/plugins/music/mopidy/_conf.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass +from typing import Optional + +from ._common import DEFAULT_TIMEOUT + + +@dataclass +class MopidyConfig: + """ + Mopidy configuration. + """ + + host: str = 'localhost' + port: int = 6680 + ssl: bool = False + timeout: Optional[float] = DEFAULT_TIMEOUT + + @property + def url(self) -> str: + return f'ws{"s" if self.ssl else ""}://{self.host}:{self.port}/mopidy/ws' + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/music/mopidy/_exc.py b/platypush/plugins/music/mopidy/_exc.py new file mode 100644 index 00000000..41c6b99f --- /dev/null +++ b/platypush/plugins/music/mopidy/_exc.py @@ -0,0 +1,10 @@ +class MopidyException(Exception): + """ + Base class for all Mopidy exceptions. + """ + + +class EmptyTrackException(MopidyException, ValueError): + """ + Raised when a parsed track is empty. + """ diff --git a/platypush/plugins/music/mopidy/_playlist.py b/platypush/plugins/music/mopidy/_playlist.py new file mode 100644 index 00000000..141b5bc5 --- /dev/null +++ b/platypush/plugins/music/mopidy/_playlist.py @@ -0,0 +1,33 @@ +from dataclasses import dataclass, field +from datetime import datetime +from typing import List, Optional + +from platypush.schemas.mopidy import MopidyPlaylistSchema + +from ._track import MopidyTrack + + +@dataclass +class MopidyPlaylist: + """ + Model for a Mopidy playlist. + """ + + uri: str + name: str + last_modified: Optional[datetime] = None + tracks: List[MopidyTrack] = field(default_factory=list) + type: str = "playlist" + + @classmethod + def parse(cls, playlist: dict) -> "MopidyPlaylist": + """ + Parse a Mopidy playlist from a dictionary received from the Mopidy API. + """ + return cls(**MopidyPlaylistSchema().load(playlist)) # type: ignore + + def to_dict(self) -> dict: + """ + Convert the Mopidy playlist to a dictionary. + """ + return dict(MopidyPlaylistSchema().dump(self)) diff --git a/platypush/plugins/music/mopidy/_status.py b/platypush/plugins/music/mopidy/_status.py new file mode 100644 index 00000000..6fa6f6b7 --- /dev/null +++ b/platypush/plugins/music/mopidy/_status.py @@ -0,0 +1,47 @@ +from dataclasses import asdict, dataclass +from typing import Optional + +from platypush.plugins.media import PlayerState +from platypush.schemas.mopidy import MopidyStatusSchema + +from ._track import MopidyTrack + + +@dataclass +class MopidyStatus: + """ + A dataclass to hold the status of the Mopidy client. + """ + + state: PlayerState = PlayerState.STOP + volume: float = 0 + consume: bool = False + random: bool = False + repeat: bool = False + single: bool = False + mute: bool = False + time: Optional[float] = None + duration: Optional[float] = None + playing_pos: Optional[int] = None + track: Optional[MopidyTrack] = None + + def copy(self): + return MopidyStatus( + state=self.state, + volume=self.volume, + consume=self.consume, + random=self.random, + repeat=self.repeat, + single=self.single, + mute=self.mute, + time=self.time, + duration=self.duration, + playing_pos=self.playing_pos, + track=MopidyTrack(**asdict(self.track)) if self.track else None, + ) + + def to_dict(self): + """ + Convert the Mopidy status to a dictionary. + """ + return dict(MopidyStatusSchema().dump(self)) diff --git a/platypush/plugins/music/mopidy/_sync.py b/platypush/plugins/music/mopidy/_sync.py new file mode 100644 index 00000000..94727344 --- /dev/null +++ b/platypush/plugins/music/mopidy/_sync.py @@ -0,0 +1,50 @@ +from dataclasses import dataclass, field +from threading import Event, RLock +from typing import Optional + + +@dataclass +class PlaylistSync: + """ + Object used to synchronize playlist load/change events between threads. + """ + + _loading_lock: RLock = field(default_factory=RLock) + _loading: Event = field(default_factory=Event) + _loaded: Event = field(default_factory=Event) + + def wait_for_loading(self, timeout: Optional[float] = None): + """ + Wait for the playlist to be loaded. + + :param timeout: The maximum time to wait for the playlist to be loaded. + """ + # If the loading event is not set, no playlist change - we can proceed + # with notifying the event. + if not self._loading.is_set(): + return True + + # Wait for the full playlist to be loaded. + return self._loaded.wait(timeout) + + def __enter__(self): + """ + Called when entering a context manager to handle a playlist loading + session. + """ + self._loading_lock.acquire() + self._loading.set() + self._loaded.clear() + + def __exit__(self, *_): + """ + Called when exiting a context manager to handle a playlist loading + session. + """ + self._loading.clear() + self._loaded.set() + + try: + self._loading_lock.release() + except RuntimeError: + pass diff --git a/platypush/plugins/music/mopidy/_task.py b/platypush/plugins/music/mopidy/_task.py new file mode 100644 index 00000000..0ed96ab3 --- /dev/null +++ b/platypush/plugins/music/mopidy/_task.py @@ -0,0 +1,58 @@ +import json +from dataclasses import dataclass, field +from queue import Queue +from threading import Event +from typing import Any, Optional, Union + +from websocket import WebSocketApp + +from ._common import DEFAULT_TIMEOUT + + +@dataclass +class MopidyTask: + """ + A task to be executed by the Mopidy client. + """ + + id: int + method: str + args: dict = field(default_factory=dict) + response: Optional[Any] = None + response_ready: Event = field(default_factory=Event) + response_queue: Queue = field(default_factory=Queue) + + def to_dict(self): + return { + "jsonrpc": "2.0", + "id": self.id, + "method": self.method, + 'params': self.args, + } + + def __str__(self): + return json.dumps(self.to_dict()) + + def send(self, ws: WebSocketApp): + assert ws, "Websocket connection not established" + self.response_ready.clear() + ws.send(str(self)) + + def get_response(self, timeout: Optional[float] = DEFAULT_TIMEOUT) -> Any: + ret = self.response_queue.get(timeout=timeout) + if isinstance(ret, dict): + ret = ret.get('result') + + return ret + + def put_response(self, response: Union[dict, Exception]): + self.response = response + self.response_ready.set() + self.response_queue.put_nowait(response) + + def wait(self, timeout: Optional[float] = DEFAULT_TIMEOUT) -> bool: + return self.response_ready.wait(timeout=timeout) + + def run(self, ws: WebSocketApp, timeout: Optional[float] = DEFAULT_TIMEOUT): + self.send(ws) + return self.get_response(timeout=timeout) diff --git a/platypush/plugins/music/mopidy/_track.py b/platypush/plugins/music/mopidy/_track.py new file mode 100644 index 00000000..c359f2ba --- /dev/null +++ b/platypush/plugins/music/mopidy/_track.py @@ -0,0 +1,43 @@ +from dataclasses import dataclass +from typing import Optional + +from platypush.schemas.mopidy import MopidyTrackSchema + +from ._exc import EmptyTrackException + + +@dataclass +class MopidyTrack: + """ + Model for a Mopidy track. + """ + + uri: str + artist: Optional[str] = None + title: Optional[str] = None + album: Optional[str] = None + artist_uri: Optional[str] = None + album_uri: Optional[str] = None + time: Optional[float] = None + playlist_pos: Optional[int] = None + track_id: Optional[int] = None + track_no: Optional[int] = None + date: Optional[str] = None + genre: Optional[str] = None + type: str = 'track' + + @classmethod + def parse(cls, track: dict) -> Optional["MopidyTrack"]: + """ + Parse a Mopidy track from a dictionary received from the Mopidy API. + """ + try: + return cls(**MopidyTrackSchema().load(track)) # type: ignore + except EmptyTrackException: + return None + + def to_dict(self) -> dict: + """ + Convert the Mopidy track to a dictionary. + """ + return dict(MopidyTrackSchema().dump(self)) diff --git a/platypush/backend/music/mopidy/manifest.yaml b/platypush/plugins/music/mopidy/manifest.yaml similarity index 92% rename from platypush/backend/music/mopidy/manifest.yaml rename to platypush/plugins/music/mopidy/manifest.yaml index c34a18da..4166f184 100644 --- a/platypush/backend/music/mopidy/manifest.yaml +++ b/platypush/plugins/music/mopidy/manifest.yaml @@ -13,5 +13,5 @@ manifest: platypush.message.event.music.VolumeChangeEvent: if the main volume has changed install: pip: [] - package: platypush.backend.music.mopidy - type: backend + package: platypush.plugins.music.mopidy + type: plugin diff --git a/platypush/plugins/music/mpd/__init__.py b/platypush/plugins/music/mpd/__init__.py index 717e3023..d74e46e3 100644 --- a/platypush/plugins/music/mpd/__init__.py +++ b/platypush/plugins/music/mpd/__init__.py @@ -21,6 +21,18 @@ class MusicMpdPlugin(MusicPlugin, RunnablePlugin): the original protocol and with support for multiple music sources through plugins (e.g. Spotify, TuneIn, Soundcloud, local files etc.). + .. note:: If you use Mopidy, and unless you have quite specific use-cases + (like you don't want to expose the Mopidy HTTP interface, or you have + some legacy automation that uses the MPD interface), you should use the + :class:`platypush.plugins.music.mopidy.MusicMopidyPlugin` plugin instead + of this. The Mopidy plugin provides a more complete and feature-rich + experience, as not all the features of Mopidy are available through the + MPD interface, and its API is 100% compatible with this plugin. Also, + this plugin operates a synchronous/polling logic because of the + limitations of the MPD protocol, while the Mopidy plugin, as it uses the + Mopidy Websocket API, can operate in a more efficient way and provide + real-time updates. + .. note:: As of Mopidy 3.0 MPD is an optional interface provided by the ``mopidy-mpd`` extension. Make sure that you have the extension installed and enabled on your instance to use this plugin if you want to @@ -645,11 +657,12 @@ class MusicMpdPlugin(MusicPlugin, RunnablePlugin): self._exec('rename', playlist, new_name) @action - def lsinfo(self, uri: Optional[str] = None): - """ - Returns the list of playlists and directories on the server. + def browse(self, uri: Optional[str] = None): """ + Browse the items under the specified URI. + :param uri: URI to browse (default: root directory). + """ return ( self._exec('lsinfo', uri, return_status=False) if uri diff --git a/platypush/schemas/mopidy.py b/platypush/schemas/mopidy.py new file mode 100644 index 00000000..1c217268 --- /dev/null +++ b/platypush/schemas/mopidy.py @@ -0,0 +1,325 @@ +from marshmallow import EXCLUDE, fields, post_dump, post_load, pre_dump, pre_load +from marshmallow.schema import Schema + +from platypush.plugins.media import PlayerState +from platypush.schemas import DateTime + + +class MopidyTrackSchema(Schema): + """ + Mopidy track schema. + """ + + uri = fields.String(required=True, metadata={"description": "Track URI"}) + file = fields.String( + metadata={"description": "Track URI, for MPD compatibility purposes"} + ) + artist = fields.String(missing=None, metadata={"description": "Artist name"}) + title = fields.String(missing=None, metadata={"description": "Track title"}) + album = fields.String(missing=None, metadata={"description": "Album name"}) + artist_uri = fields.String( + missing=None, metadata={"description": "Artist URI (if available)"} + ) + album_uri = fields.String( + missing=None, metadata={"description": "Album URI (if available)"} + ) + time = fields.Float( + missing=None, metadata={"description": "Track length (in seconds)"} + ) + playlist_pos = fields.Integer( + missing=None, + metadata={"description": "Track position in the tracklist/playlist"}, + ) + track_id = fields.Integer( + missing=None, metadata={"description": "Track ID in the current tracklist"} + ) + track_no = fields.Integer( + missing=None, metadata={"description": "Track number in the album"} + ) + date = fields.String(missing=None, metadata={"description": "Track release date"}) + genre = fields.String(missing=None, metadata={"description": "Track genre"}) + type = fields.Constant("track", metadata={"description": "Item type"}) + + @pre_load + def parse(self, track: dict, **_): + from platypush.plugins.music.mopidy import EmptyTrackException + + uri = (track or {}).get("uri", (track or {}).get("track", {}).get("uri")) + if not uri: + raise EmptyTrackException("Empty track") + + tlid = track.get("tlid") + playlist_pos = track.get("playlist_pos") + if track.get("track"): + track = track.get("track", {}) + + length = track.get("length", track.get("time", track.get("duration"))) + return { + "uri": uri, + "artist": next( + iter(item.get("name") for item in track.get("artists", [])), + None, + ), + "title": track.get("name"), + "album": track.get("album", {}).get("name"), + "artist_uri": next( + iter(item.get("uri") for item in track.get("artists", [])), None + ), + "album_uri": track.get("album", {}).get("uri"), + "time": length / 1000 if length is not None else None, + "playlist_pos": ( + track.get("playlist_pos") if playlist_pos is None else playlist_pos + ), + "date": track.get("date", track.get("album", {}).get("date")), + "track_id": tlid, + "track_no": track.get("track_no"), + "genre": track.get("genre"), + } + + @post_dump + def to_dict(self, track: dict, **_): + """ + Fill/move missing fields in the dictionary. + """ + return { + "file": track["uri"], + **track, + } + + +class MopidyStatusSchema(Schema): + """ + Mopidy status schema. + """ + + state = fields.Enum( + PlayerState, + required=True, + metadata={"description": "Player state"}, + ) + volume = fields.Float(metadata={"description": "Player volume (0-100)"}) + consume = fields.Boolean(metadata={"description": "Consume mode"}) + random = fields.Boolean(metadata={"description": "Random mode"}) + repeat = fields.Boolean(metadata={"description": "Repeat mode"}) + single = fields.Boolean(metadata={"description": "Single mode"}) + mute = fields.Boolean(metadata={"description": "Mute mode"}) + time = fields.Float(metadata={"description": "Current time (in seconds)"}) + playing_pos = fields.Integer( + metadata={"description": "Index of the currently playing track"} + ) + track = fields.Nested( + MopidyTrackSchema, missing=None, metadata={"description": "Current track"} + ) + + @post_dump + def post_dump(self, data: dict, **_): + """ + Post-dump hook. + """ + state = data.get("state") + if state: + data["state"] = getattr(PlayerState, state).value + return data + + +class MopidyPlaylistSchema(Schema): + """ + Mopidy playlist schema. + """ + + # pylint: disable=too-few-public-methods + class Meta: # type: ignore + """ + Mopidy playlist schema metadata. + """ + + unknown = EXCLUDE + + uri = fields.String(required=True, metadata={"description": "Playlist URI"}) + name = fields.String(required=True, metadata={"description": "Playlist name"}) + last_modified = DateTime(metadata={"description": "Last modified timestamp"}) + tracks = fields.List( + fields.Nested(MopidyTrackSchema), + missing=None, + metadata={"description": "Playlist tracks"}, + ) + type = fields.Constant("playlist", metadata={"description": "Item type"}) + + @pre_dump + def pre_dump(self, playlist, **_): + """ + Pre-dump hook. + """ + last_modified = ( + playlist.last_modified + if hasattr(playlist, "last_modified") + else playlist.get("last_modified") + ) + + if last_modified: + last_modified /= 1000 + if hasattr(playlist, "last_modified"): + playlist.last_modified = last_modified + else: + playlist["last_modified"] = last_modified + + return playlist + + +class MopidyArtistSchema(Schema): + """ + Mopidy artist schema. + """ + + uri = fields.String(required=True, metadata={"description": "Artist URI"}) + file = fields.String( + metadata={"description": "Artist URI, for MPD compatibility purposes"} + ) + name = fields.String(missing=None, metadata={"description": "Artist name"}) + artist = fields.String( + missing=None, + metadata={"description": "Same as name - for MPD compatibility purposes"}, + ) + type = fields.Constant("artist", metadata={"description": "Item type"}) + + @post_dump + def to_dict(self, artist: dict, **_): + """ + Fill/move missing fields in the dictionary. + """ + return { + "file": artist["uri"], + "artist": artist["name"], + **artist, + } + + +class MopidyAlbumSchema(Schema): + """ + Mopidy album schema. + """ + + uri = fields.String(required=True, metadata={"description": "Album URI"}) + file = fields.String( + metadata={"description": "Artist URI, for MPD compatibility purposes"} + ) + artist = fields.String(missing=None, metadata={"description": "Artist name"}) + album = fields.String( + missing=None, + metadata={"description": "Same as name - for MPD compatibility purposes"}, + ) + name = fields.String(missing=None, metadata={"description": "Album name"}) + artist_uri = fields.String(missing=None, metadata={"description": "Artist URI"}) + date = fields.String(missing=None, metadata={"description": "Album release date"}) + genre = fields.String(missing=None, metadata={"description": "Album genre"}) + + def parse(self, data: dict, **_): + assert data.get("uri"), "Album URI is required" + return { + "uri": data["uri"], + "artist": data.get("artist") + or next( + iter(item.get("name") for item in data.get("artists", [])), + None, + ), + "name": data.get("name"), + "artist_uri": data.get("artist_uri") + or next(iter(item.get("uri") for item in data.get("artists", [])), None), + "album_uri": data.get("album_uri") or data.get("album", {}).get("uri"), + "date": data.get("date", data.get("album", {}).get("date")), + "genre": data.get("genre"), + } + + @pre_load + def pre_load(self, album: dict, **_): + """ + Pre-load hook. + """ + return self.parse(album) + + @pre_dump + def pre_dump(self, album: dict, **_): + """ + Pre-dump hook. + """ + return self.parse(album) + + @post_dump + def to_dict(self, album: dict, **_): + """ + Fill/move missing fields in the dictionary. + """ + return { + "file": album["uri"], + "album": album["name"], + **album, + } + + +class MopidyDirectorySchema(Schema): + """ + Mopidy directory schema. + """ + + uri = fields.String(required=True, metadata={"description": "Directory URI"}) + name = fields.String(required=True, metadata={"description": "Directory name"}) + type = fields.Constant("directory", metadata={"description": "Item type"}) + + +class MopidyFilterSchema(Schema): + """ + Mopidy filter schema. + """ + + uris = fields.List(fields.String, metadata={"description": "Filter by URIs"}) + artist = fields.List(fields.String, metadata={"description": "Artist name(s)"}) + album = fields.List(fields.String, metadata={"description": "Album name(s)"}) + title = fields.List(fields.String, metadata={"description": "Track title(s)"}) + albumartist = fields.List( + fields.String, metadata={"description": "Album artist name(s)"} + ) + date = fields.List(fields.String, metadata={"description": "Track release date(s)"}) + genre = fields.List(fields.String, metadata={"description": "Genre(s)"}) + comment = fields.List(fields.String, metadata={"description": "Comment(s)"}) + disc_no = fields.List(fields.Integer, metadata={"description": "Disc number(s)"}) + musicbrainz_artistid = fields.List( + fields.String, metadata={"description": "MusicBrainz artist ID(s)"} + ) + musicbrainz_albumid = fields.List( + fields.String, metadata={"description": "MusicBrainz album ID(s)"} + ) + musicbrainz_trackid = fields.List( + fields.String, metadata={"description": "MusicBrainz album artist ID(s)"} + ) + any = fields.List( + fields.String, metadata={"description": "Generic search string(s)"} + ) + + @pre_load + def pre_load(self, data: dict, **_): + """ + Pre-load hook. + """ + for field_name, field in self.fields.items(): + value = data.get(field_name) + + # Back-compatibtility with MPD's single-value filters + if ( + value is not None + and isinstance(field, fields.List) + and isinstance(value, str) + ): + data[field_name] = [value] + + return data + + @post_load + def post_load(self, data: dict, **_): + """ + Post-load hook. + """ + title = data.pop("title", None) + if title: + data["track_name"] = title + + return data diff --git a/platypush/utils/threads.py b/platypush/utils/threads.py index df64a3b9..b48b32b4 100644 --- a/platypush/utils/threads.py +++ b/platypush/utils/threads.py @@ -35,8 +35,11 @@ def OrEvent(*events, cls: Type = threading.Event): e.changed() def _or_set(e): - e._set() - e.changed() + try: + e._set() + e.changed() + except RecursionError: + pass for e in events: _to_or(e, changed)