Switched mopidy backend from websockets to websocket-client

This commit is contained in:
Fabio Manganiello 2019-02-12 22:52:29 +01:00
parent b555777cc8
commit ee3df8fe8c
3 changed files with 120 additions and 100 deletions

View file

@ -1,4 +1,3 @@
import asyncio
import json import json
import queue import queue
import re import re
@ -46,21 +45,7 @@ class MusicMopidyBackend(Backend):
self.url = 'ws://{}:{}/mopidy/ws'.format(host, port) self.url = 'ws://{}:{}/mopidy/ws'.format(host, port)
self._msg_id = 0 self._msg_id = 0
self._latest_status = self._get_tracklist_status() self._latest_status = self._get_tracklist_status()
self._ws = None
async def _poll_events(self):
import websockets
try:
while not self.should_stop():
async with websockets.connect(self.url) as ws:
msg = await ws.recv()
if isinstance(msg, bytes):
msg = msg.decode()
self._handle_msg(msg)
except Exception as e:
self.logger.warning('The Mopidy backend raised an exception')
self.logger.exception(e)
time.sleep(2) # Wait a bit before retrying
def _parse_track(self, track, pos=None): def _parse_track(self, track, pos=None):
if not track: if not track:
@ -90,7 +75,7 @@ class MusicMopidyBackend(Backend):
def _communicate(self, msg): def _communicate(self, msg):
import websockets import websocket
if isinstance(msg, str): if isinstance(msg, str):
msg = json.loads(msg) msg = json.loads(msg)
@ -99,21 +84,11 @@ class MusicMopidyBackend(Backend):
msg['jsonrpc'] = '2.0' msg['jsonrpc'] = '2.0'
msg['id'] = self._msg_id msg['id'] = self._msg_id
msg = json.dumps(msg) msg = json.dumps(msg)
resp_queue = queue.Queue()
async def get_response(): ws = websocket.create_connection(self.url)
async with websockets.connect(self.url) as ws: ws.send(msg)
await ws.send(msg) return json.loads(ws.recv()).get('result')
response = await ws.recv()
if isinstance(response, bytes):
response = response.decode()
resp_queue.put(json.loads(response))
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(get_response())
loop.stop()
return resp_queue.get().get('result')
def _get_tracklist_status(self): def _get_tracklist_status(self):
return { return {
@ -127,87 +102,128 @@ class MusicMopidyBackend(Backend):
'method': 'core.tracklist.get_consume'}), 'method': 'core.tracklist.get_consume'}),
} }
def _handle_msg(self, msg): async def _poll_events(self):
msg = json.loads(msg) import websocket
event = msg.get('event')
if not event:
return
status = {} try:
track = msg.get('tl_track', {}) while not self.should_stop():
async with websockets.connect(self.url) as ws:
msg = await ws.recv()
if isinstance(msg, bytes):
msg = msg.decode()
self._handle_msg(msg)
except Exception as e:
self.logger.warning('The Mopidy backend raised an exception')
self.logger.exception(e)
time.sleep(2) # Wait a bit before retrying
if event == 'track_playback_paused': def _on_msg(self):
status['state'] = 'pause' def hndl(msg):
track = self._parse_track(track) msg = json.loads(msg)
if not track: event = msg.get('event')
return if not event:
self.bus.post(MusicPauseEvent(status=status, track=track))
elif event == 'track_playback_resumed':
status['state'] = 'play'
track = self._parse_track(track)
if not track:
return
self.bus.post(MusicPlayEvent(status=status, track=track))
elif event == 'track_playback_ended':
status['state'] = 'stop'
track = self._parse_track(track)
if not track:
return
self.bus.post(MusicPlayEvent(status=status, track=track))
elif event == 'track_playback_started':
track = self._parse_track(track)
if not track:
return
self.bus.post(NewPlayingTrackEvent(status=status, track=track))
elif event == 'stream_title_changed':
m = re.match('^\s*(.+?)\s+-\s+(.*)\s*$', msg.get('title', ''))
if not m:
return return
track['artist'] = m.group(1) status = {}
track['title'] = m.group(2) track = msg.get('tl_track', {})
self.bus.post(NewPlayingTrackEvent(status=status, track=track))
elif event == 'volume_changed': if event == 'track_playback_paused':
status['volume'] = msg.get('volume') status['state'] = 'pause'
self.bus.post(VolumeChangeEvent(volume=status['volume'], track = self._parse_track(track)
if not track:
return
self.bus.post(MusicPauseEvent(status=status, track=track))
elif event == 'track_playback_resumed':
status['state'] = 'play'
track = self._parse_track(track)
if not track:
return
self.bus.post(MusicPlayEvent(status=status, track=track))
elif event == 'track_playback_ended':
status['state'] = 'stop'
track = self._parse_track(track)
if not track:
return
self.bus.post(MusicPlayEvent(status=status, track=track))
elif event == 'track_playback_started':
track = self._parse_track(track)
if not track:
return
self.bus.post(NewPlayingTrackEvent(status=status, track=track))
elif event == 'stream_title_changed':
m = re.match('^\s*(.+?)\s+-\s+(.*)\s*$', msg.get('title', ''))
if not m:
return
track['artist'] = m.group(1)
track['title'] = m.group(2)
self.bus.post(NewPlayingTrackEvent(status=status, track=track))
elif event == 'volume_changed':
status['volume'] = msg.get('volume')
self.bus.post(VolumeChangeEvent(volume=status['volume'],
status=status, track=track))
elif event == 'mute_changed':
status['mute'] = msg.get('mute')
self.bus.post(MuteChangeEvent(mute=status['mute'],
status=status, track=track)) status=status, track=track))
elif event == 'mute_changed': elif event == 'seeked':
status['mute'] = msg.get('mute') status['position'] = msg.get('time_position')
self.bus.post(MuteChangeEvent(mute=status['mute'], self.bus.post(SeekChangeEvent(position=status['position'],
status=status, track=track)) status=status, track=track))
elif event == 'seeked': elif event == 'tracklist_changed':
status['position'] = msg.get('time_position') tracklist = [self._parse_track(t, pos=i)
self.bus.post(SeekChangeEvent(position=status['position'], for i, t in self._communicate({
status=status, track=track)) 'method': 'core.tracklist.get_tl_tracks' })]
elif event == 'tracklist_changed':
tracklist = [self._parse_track(t, pos=i)
for i, t in self._communicate({
'method': 'core.tracklist.get_tl_tracks' })]
self.bus.post(PlaylistChangeEvent(changes=tracklist)) self.bus.post(PlaylistChangeEvent(changes=tracklist))
elif event == 'options_changed': elif event == 'options_changed':
new_status = self._get_tracklist_status() new_status = self._get_tracklist_status()
if new_status['random'] != self._latest_status['random']: if new_status['random'] != self._latest_status['random']:
self.bus.post(PlaybackRandomModeChangeEvent(state=new_status['random'])) self.bus.post(PlaybackRandomModeChangeEvent(state=new_status['random']))
if new_status['repeat'] != self._latest_status['repeat']: if new_status['repeat'] != self._latest_status['repeat']:
self.bus.post(PlaybackRepeatModeChangeEvent(state=new_status['repeat'])) self.bus.post(PlaybackRepeatModeChangeEvent(state=new_status['repeat']))
if new_status['single'] != self._latest_status['single']: if new_status['single'] != self._latest_status['single']:
self.bus.post(PlaybackSingleModeChangeEvent(state=new_status['single'])) self.bus.post(PlaybackSingleModeChangeEvent(state=new_status['single']))
if new_status['consume'] != self._latest_status['consume']: if new_status['consume'] != self._latest_status['consume']:
self.bus.post(PlaybackConsumeModeChangeEvent(state=new_status['consume'])) self.bus.post(PlaybackConsumeModeChangeEvent(state=new_status['consume']))
self._latest_status = new_status self._latest_status = new_status
return hndl
def _on_error(self):
def hndl(error):
self.logger.warning('Mopidy websocket error: {}'.format(error))
return hndl
def _on_close(self):
def hndl():
self._ws = None
self.logger.warning('Mopidy websocket connection closed')
time.sleep(5)
self._connect()
return hndl
def _on_open(self):
def hndl():
self.logger.info('Mopidy websocket connected')
return hndl
def _connect(self):
if not self._ws:
self._ws = websocket.WebSocketApp(self.url,
on_message=self._on_msg(),
on_error=self._on_error(),
on_close=self._on_close())
def run(self): def run(self):
super().run() super().run()
self.logger.info('Started Mopidy events backend on {}:{}'.format( self.logger.info('Started tracking Mopidy events backend on {}:{}'.
self.host, self.port)) format(self.host, self.port))
loop = asyncio.new_event_loop() self._connect()
asyncio.set_event_loop(loop) self._ws.on_open = self._on_open()
self._ws.run_forever()
while not self.should_stop():
loop.run_until_complete(self._poll_events())
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -128,3 +128,6 @@ inputs
# git+https://github.com/agonzalezro/python-opensubtitles#egg=python-opensubtitles # git+https://github.com/agonzalezro/python-opensubtitles#egg=python-opensubtitles
# webvtt-py # webvtt-py
# Mopidy backend
websocket-client

View file

@ -96,6 +96,7 @@ setup(
'Support for Chromecast plugin': ['pychromecast'], 'Support for Chromecast plugin': ['pychromecast'],
'Support for sound devices': ['sounddevice', 'soundfile', 'numpy'], 'Support for sound devices': ['sounddevice', 'soundfile', 'numpy'],
'Support for web media subtitles': ['webvtt-py'] 'Support for web media subtitles': ['webvtt-py']
'Support for mopidy backend': ['websocket-client'],
# 'Support for Leap Motion backend': ['git+ssh://git@github.com:BlackLight/leap-sdk-python3.git'], # 'Support for Leap Motion backend': ['git+ssh://git@github.com:BlackLight/leap-sdk-python3.git'],
# 'Support for Flic buttons': ['git+https://@github.com/50ButtonsEach/fliclib-linux-hci.git'] # 'Support for Flic buttons': ['git+https://@github.com/50ButtonsEach/fliclib-linux-hci.git']
# 'Support for media subtitles': ['git+https://github.com/agonzalezro/python-opensubtitles#egg=python-opensubtitles'] # 'Support for media subtitles': ['git+https://github.com/agonzalezro/python-opensubtitles#egg=python-opensubtitles']