Refactored and simplified Snapcast backend. Moreover the "grace sleep"

should happen in any case of errors, not only on disconnect errors,
to prevent a tight loop of retries.
This commit is contained in:
Fabio Manganiello 2019-11-05 18:10:41 +01:00
parent 6aa13a10cb
commit f04f42caef

View file

@ -4,7 +4,6 @@ import threading
import time import time
from platypush.backend import Backend from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.utils import set_thread_name from platypush.utils import set_thread_name
from platypush.message.event.music.snapcast import ClientVolumeChangeEvent, \ from platypush.message.event.music.snapcast import ClientVolumeChangeEvent, \
GroupMuteChangeEvent, ClientConnectedEvent, ClientDisconnectedEvent, \ GroupMuteChangeEvent, ClientConnectedEvent, ClientDisconnectedEvent, \
@ -68,16 +67,15 @@ class MusicSnapcastBackend(Backend):
for _ in range(len(ports), len(hosts)): for _ in range(len(ports), len(hosts)):
self.ports.append(self._DEFAULT_SNAPCAST_PORT) self.ports.append(self._DEFAULT_SNAPCAST_PORT)
def _connect(self, host, port): def _connect(self, host, port):
if self._socks.get(host): if self._socks.get(host):
return self._socks[host] return self._socks[host]
self.logger.debug('Connecting to {}:{}'.format(host, port)) self.logger.debug('Connecting to {host}:{port}'.format(host=host, port=port))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port)) sock.connect((host, port))
self._socks[host] = sock self._socks[host] = sock
self.logger.info('Connected to {}:{}'.format(host, port)) self.logger.info('Connected to {host}:{port}'.format(host=host, port=port))
return sock return sock
def _disconnect(self, host, port): def _disconnect(self, host, port):
@ -86,9 +84,13 @@ class MusicSnapcastBackend(Backend):
self.logger.debug('Not connected to {}:{}'.format(host, port)) self.logger.debug('Not connected to {}:{}'.format(host, port))
return return
try: sock.close() try:
except: pass sock.close()
finally: self._socks[host] = None except Exception as e:
self.logger.warning(('Exception while disconnecting from {host}:{port}: {error}'.
format(host=host, port=port, error=str(e))))
finally:
self._socks[host] = None
@classmethod @classmethod
def _recv(cls, sock): def _recv(cls, sock):
@ -139,22 +141,9 @@ class MusicSnapcastBackend(Backend):
return evt return evt
def _client(self, host, port): def _client(self, host, port, thread_name):
def _thread(): def _thread():
set_thread_name('Snapcast-' + host) set_thread_name(thread_name)
try:
self._status(host, port)
except Exception as e:
self.logger.warning(('Exception while getting the status ' +
'of the Snapcast server {}:{}: {}').
format(host, port, str(e)))
try:
self._disconnect(host, port)
time.sleep(self.poll_seconds)
except:
pass
while not self.should_stop(): while not self.should_stop():
try: try:
@ -165,47 +154,22 @@ class MusicSnapcastBackend(Backend):
msgs = [msgs] msgs = [msgs]
for msg in msgs: for msg in msgs:
self.logger.debug('Received message on {}:{}: {}'.format( self.logger.debug('Received message on {host}:{port}: {msg}'.
host, port, msg)) format(host=host, port=port, msg=msg))
evt = self._parse_msg(host=host, msg=msg) evt = self._parse_msg(host=host, msg=msg)
if evt: if evt:
self.bus.post(evt) self.bus.post(evt)
except Exception as e: except Exception as e:
self.logger.warning(('Exception while getting the status ' + self.logger.warning('Exception while getting the status ' + 'of the Snapcast server {}:{}: {}'.
'of the Snapcast server {}:{}: {}').
format(host, port, str(e))) format(host, port, str(e)))
try:
self._disconnect(host, port) self._disconnect(host, port)
except:
pass
finally: finally:
time.sleep(self.poll_seconds) time.sleep(self.poll_seconds)
return _thread return _thread
@classmethod
def _get_req_id(cls):
return get_plugin('music.snapcast')._get_req_id()
def _status(self, host, port):
sock = self._connect(host, port)
request = {
'id': self._get_req_id(),
'jsonrpc':'2.0',
'method':'Server.GetStatus'
}
get_plugin('music.snapcast')._send(sock, request)
try:
return self._recv(sock).get('result', {}).get('server', {})
except Exception as e:
self.logger.warning('Unable to connect to {}:{}: {}'.format(
host, port, str(e)))
self._socks[host] = None
def run(self): def run(self):
super().run() super().run()
@ -215,9 +179,11 @@ class MusicSnapcastBackend(Backend):
while not self.should_stop(): while not self.should_stop():
for i, host in enumerate(self.hosts): for i, host in enumerate(self.hosts):
port = self.ports[i] port = self.ports[i]
thread_name = 'Snapcast-{host}-{port}'.format(host=host, port=port)
self._threads[host] = threading.Thread( self._threads[host] = threading.Thread(
target=self._client(host, port), target=self._client(host, port, thread_name),
name='Snapcast-' + host name=thread_name
) )
self._threads[host].start() self._threads[host].start()