diff --git a/platypush/backend/music/mopidy.py b/platypush/backend/music/mopidy.py index eccd8083..539c2d4e 100644 --- a/platypush/backend/music/mopidy.py +++ b/platypush/backend/music/mopidy.py @@ -1,4 +1,3 @@ -import asyncio import json import queue import re @@ -46,21 +45,7 @@ class MusicMopidyBackend(Backend): self.url = 'ws://{}:{}/mopidy/ws'.format(host, port) self._msg_id = 0 self._latest_status = self._get_tracklist_status() - - async def _poll_events(self): - import websockets - - try: - while not self.should_stop(): - async with websockets.connect(self.url) as ws: - msg = await ws.recv() - if isinstance(msg, bytes): - msg = msg.decode() - self._handle_msg(msg) - except Exception as e: - self.logger.warning('The Mopidy backend raised an exception') - self.logger.exception(e) - time.sleep(2) # Wait a bit before retrying + self._ws = None def _parse_track(self, track, pos=None): if not track: @@ -90,7 +75,7 @@ class MusicMopidyBackend(Backend): def _communicate(self, msg): - import websockets + import websocket if isinstance(msg, str): msg = json.loads(msg) @@ -99,21 +84,11 @@ class MusicMopidyBackend(Backend): msg['jsonrpc'] = '2.0' msg['id'] = self._msg_id msg = json.dumps(msg) - resp_queue = queue.Queue() - async def get_response(): - async with websockets.connect(self.url) as ws: - await ws.send(msg) - response = await ws.recv() - if isinstance(response, bytes): - response = response.decode() - resp_queue.put(json.loads(response)) + ws = websocket.create_connection(self.url) + ws.send(msg) + return json.loads(ws.recv()).get('result') - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(get_response()) - loop.stop() - return resp_queue.get().get('result') def _get_tracklist_status(self): return { @@ -127,87 +102,128 @@ class MusicMopidyBackend(Backend): 'method': 'core.tracklist.get_consume'}), } - def _handle_msg(self, msg): - msg = json.loads(msg) - event = msg.get('event') - if not event: - return + async def _poll_events(self): + import websocket - status = {} - track = msg.get('tl_track', {}) + try: + while not self.should_stop(): + async with websockets.connect(self.url) as ws: + msg = await ws.recv() + if isinstance(msg, bytes): + msg = msg.decode() + self._handle_msg(msg) + except Exception as e: + self.logger.warning('The Mopidy backend raised an exception') + self.logger.exception(e) + time.sleep(2) # Wait a bit before retrying - if event == 'track_playback_paused': - status['state'] = 'pause' - track = self._parse_track(track) - if not track: - return - self.bus.post(MusicPauseEvent(status=status, track=track)) - elif event == 'track_playback_resumed': - status['state'] = 'play' - track = self._parse_track(track) - if not track: - return - self.bus.post(MusicPlayEvent(status=status, track=track)) - elif event == 'track_playback_ended': - status['state'] = 'stop' - track = self._parse_track(track) - if not track: - return - self.bus.post(MusicPlayEvent(status=status, track=track)) - elif event == 'track_playback_started': - track = self._parse_track(track) - if not track: - return - self.bus.post(NewPlayingTrackEvent(status=status, track=track)) - elif event == 'stream_title_changed': - m = re.match('^\s*(.+?)\s+-\s+(.*)\s*$', msg.get('title', '')) - if not m: + def _on_msg(self): + def hndl(msg): + msg = json.loads(msg) + event = msg.get('event') + if not event: return - track['artist'] = m.group(1) - track['title'] = m.group(2) - self.bus.post(NewPlayingTrackEvent(status=status, track=track)) - elif event == 'volume_changed': - status['volume'] = msg.get('volume') - self.bus.post(VolumeChangeEvent(volume=status['volume'], + status = {} + track = msg.get('tl_track', {}) + + if event == 'track_playback_paused': + status['state'] = 'pause' + track = self._parse_track(track) + if not track: + return + self.bus.post(MusicPauseEvent(status=status, track=track)) + elif event == 'track_playback_resumed': + status['state'] = 'play' + track = self._parse_track(track) + if not track: + return + self.bus.post(MusicPlayEvent(status=status, track=track)) + elif event == 'track_playback_ended': + status['state'] = 'stop' + track = self._parse_track(track) + if not track: + return + self.bus.post(MusicPlayEvent(status=status, track=track)) + elif event == 'track_playback_started': + track = self._parse_track(track) + if not track: + return + self.bus.post(NewPlayingTrackEvent(status=status, track=track)) + elif event == 'stream_title_changed': + m = re.match('^\s*(.+?)\s+-\s+(.*)\s*$', msg.get('title', '')) + if not m: + return + + track['artist'] = m.group(1) + track['title'] = m.group(2) + self.bus.post(NewPlayingTrackEvent(status=status, track=track)) + elif event == 'volume_changed': + status['volume'] = msg.get('volume') + self.bus.post(VolumeChangeEvent(volume=status['volume'], + status=status, track=track)) + elif event == 'mute_changed': + status['mute'] = msg.get('mute') + self.bus.post(MuteChangeEvent(mute=status['mute'], status=status, track=track)) - elif event == 'mute_changed': - status['mute'] = msg.get('mute') - self.bus.post(MuteChangeEvent(mute=status['mute'], - status=status, track=track)) - elif event == 'seeked': - status['position'] = msg.get('time_position') - self.bus.post(SeekChangeEvent(position=status['position'], - status=status, track=track)) - elif event == 'tracklist_changed': - tracklist = [self._parse_track(t, pos=i) - for i, t in self._communicate({ - 'method': 'core.tracklist.get_tl_tracks' })] + elif event == 'seeked': + status['position'] = msg.get('time_position') + self.bus.post(SeekChangeEvent(position=status['position'], + status=status, track=track)) + elif event == 'tracklist_changed': + tracklist = [self._parse_track(t, pos=i) + for i, t in self._communicate({ + 'method': 'core.tracklist.get_tl_tracks' })] - self.bus.post(PlaylistChangeEvent(changes=tracklist)) - elif event == 'options_changed': - new_status = self._get_tracklist_status() - if new_status['random'] != self._latest_status['random']: - self.bus.post(PlaybackRandomModeChangeEvent(state=new_status['random'])) - if new_status['repeat'] != self._latest_status['repeat']: - self.bus.post(PlaybackRepeatModeChangeEvent(state=new_status['repeat'])) - if new_status['single'] != self._latest_status['single']: - self.bus.post(PlaybackSingleModeChangeEvent(state=new_status['single'])) - if new_status['consume'] != self._latest_status['consume']: - self.bus.post(PlaybackConsumeModeChangeEvent(state=new_status['consume'])) + self.bus.post(PlaylistChangeEvent(changes=tracklist)) + elif event == 'options_changed': + new_status = self._get_tracklist_status() + if new_status['random'] != self._latest_status['random']: + self.bus.post(PlaybackRandomModeChangeEvent(state=new_status['random'])) + if new_status['repeat'] != self._latest_status['repeat']: + self.bus.post(PlaybackRepeatModeChangeEvent(state=new_status['repeat'])) + if new_status['single'] != self._latest_status['single']: + self.bus.post(PlaybackSingleModeChangeEvent(state=new_status['single'])) + if new_status['consume'] != self._latest_status['consume']: + self.bus.post(PlaybackConsumeModeChangeEvent(state=new_status['consume'])) - self._latest_status = new_status + self._latest_status = new_status + + return hndl + + def _on_error(self): + def hndl(error): + self.logger.warning('Mopidy websocket error: {}'.format(error)) + return hndl + + def _on_close(self): + def hndl(): + self._ws = None + self.logger.warning('Mopidy websocket connection closed') + time.sleep(5) + self._connect() + return hndl + + def _on_open(self): + def hndl(): + self.logger.info('Mopidy websocket connected') + return hndl + + def _connect(self): + if not self._ws: + self._ws = websocket.WebSocketApp(self.url, + on_message=self._on_msg(), + on_error=self._on_error(), + on_close=self._on_close()) def run(self): super().run() - self.logger.info('Started Mopidy events backend on {}:{}'.format( - self.host, self.port)) + self.logger.info('Started tracking Mopidy events backend on {}:{}'. + format(self.host, self.port)) - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - while not self.should_stop(): - loop.run_until_complete(self._poll_events()) + self._connect() + self._ws.on_open = self._on_open() + self._ws.run_forever() # vim:sw=4:ts=4:et: diff --git a/requirements.txt b/requirements.txt index de9690c6..70e6d6cc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -128,3 +128,6 @@ inputs # git+https://github.com/agonzalezro/python-opensubtitles#egg=python-opensubtitles # webvtt-py +# Mopidy backend +websocket-client + diff --git a/setup.py b/setup.py index 26b2cb9f..5dd6f82f 100755 --- a/setup.py +++ b/setup.py @@ -96,6 +96,7 @@ setup( 'Support for Chromecast plugin': ['pychromecast'], 'Support for sound devices': ['sounddevice', 'soundfile', 'numpy'], 'Support for web media subtitles': ['webvtt-py'] + 'Support for mopidy backend': ['websocket-client'], # 'Support for Leap Motion backend': ['git+ssh://git@github.com:BlackLight/leap-sdk-python3.git'], # 'Support for Flic buttons': ['git+https://@github.com/50ButtonsEach/fliclib-linux-hci.git'] # 'Support for media subtitles': ['git+https://github.com/agonzalezro/python-opensubtitles#egg=python-opensubtitles']