From efad5a2bd7fb6058bc072bd37cb88f6492210b29 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 6 Jan 2019 19:19:30 +0100 Subject: [PATCH] Added Snapcast backend --- platypush/backend/music/mpd/__init__.py | 1 + platypush/backend/music/snapcast.py | 196 ++++++++++++++++++++++ platypush/message/event/music/snapcast.py | 95 +++++++++++ platypush/plugins/music/snapcast.py | 17 +- 4 files changed, 306 insertions(+), 3 deletions(-) create mode 100644 platypush/backend/music/snapcast.py create mode 100644 platypush/message/event/music/snapcast.py diff --git a/platypush/backend/music/mpd/__init__.py b/platypush/backend/music/mpd/__init__.py index 14f42619..57e6558e 100644 --- a/platypush/backend/music/mpd/__init__.py +++ b/platypush/backend/music/mpd/__init__.py @@ -20,6 +20,7 @@ class MusicMpdBackend(Backend): * :class:`platypush.message.event.music.MusicStopEvent` if the playback state changed to stop * :class:`platypush.message.event.music.NewPlayingTrackEvent` if a new track is being played * :class:`platypush.message.event.music.PlaylistChangeEvent` if the main playlist has changed + * :class:`platypush.message.event.music.VolumeChangeEvent` if the main volume has changed Requires: * **python-mpd2** (``pip install python-mpd2``) diff --git a/platypush/backend/music/snapcast.py b/platypush/backend/music/snapcast.py new file mode 100644 index 00000000..f274a583 --- /dev/null +++ b/platypush/backend/music/snapcast.py @@ -0,0 +1,196 @@ +import json +import socket +import threading +import time + +from platypush.backend import Backend +from platypush.context import get_plugin +from platypush.message.event.music.snapcast import ClientVolumeChangeEvent, \ + GroupMuteChangeEvent, ClientConnectedEvent, ClientDisconnectedEvent, \ + ClientLatencyChangeEvent, ClientNameChangeEvent, GroupStreamChangeEvent, \ + StreamUpdateEvent, ServerUpdateEvent + + +class MusicSnapcastBackend(Backend): + """ + Backend that listens for notification and status changes on one or more + [Snapcast](https://github.com/badaix/snapcast) servers. + + Triggers: + + * :class:`platypush.message.event.music.snapcast.ClientConnectedEvent` + * :class:`platypush.message.event.music.snapcast.ClientDisconnectedEvent` + * :class:`platypush.message.event.music.snapcast.ClientVolumeChangeEvent` + * :class:`platypush.message.event.music.snapcast.ClientLatencyChangeEvent` + * :class:`platypush.message.event.music.snapcast.ClientNameChangeEvent` + * :class:`platypush.message.event.music.snapcast.GroupMuteChangeEvent` + * :class:`platypush.message.event.music.snapcast.GroupStreamChangeEvent` + * :class:`platypush.message.event.music.snapcast.StreamUpdateEvent` + * :class:`platypush.message.event.music.snapcast.ServerUpdateEvent` + """ + + _DEFAULT_SNAPCAST_PORT = 1705 + _SOCKET_EOL = '\r\n'.encode() + + def __init__(self, hosts=['localhost'], ports=[_DEFAULT_SNAPCAST_PORT], + *args, **kwargs): + """ + :param hosts: List of Snapcast server names or IPs to monitor (default: + `['localhost']` + :type hosts: list[str] + + :param ports: List of control ports for the configured Snapcast servers + (default: `[1705]`) + :type ports: list[int] + """ + + super().__init__(*args, **kwargs) + + self.hosts = hosts[:] + self.ports = ports[:] + self._socks = {} + self._threads = {} + self._statuses = {} + + if len(hosts) > len(ports): + for _ in range(len(ports), len(hosts)): + self.ports.append(self._DEFAULT_SNAPCAST_PORT) + + + def _connect(self, host, port): + if host in self._socks: + return self._socks[host] + + self.logger.debug('Connecting to {}:{}'.format(host, port)) + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + self._socks[host] = sock + self.logger.info('Connected to {}:{}'.format(host, port)) + return sock + + def _disconnect(self, host, port): + sock = self._socks.get(host) + if not sock: + self.logger.debug('Not connected to {}:{}'.format(host, port)) + return + + try: sock.close() + except: pass + finally: self._socks[host] = None + + @classmethod + def _recv(cls, sock): + buf = b'' + while buf[-2:] != cls._SOCKET_EOL: + buf += sock.recv(1) + return json.loads(buf.decode().strip()) + + @classmethod + def _parse_msg(cls, host, msg): + evt = None + + if msg.get('method') == 'Client.OnVolumeChanged': + client_id = msg.get('params', {}).get('id') + volume = msg.get('params', {}).get('volume', {}).get('percent') + muted = msg.get('params', {}).get('volume', {}).get('muted') + evt = ClientVolumeChangeEvent(host=host, client=client_id, + volume=volume, muted=muted) + elif msg.get('method') == 'Group.OnMute': + group_id = msg.get('params', {}).get('id') + muted = msg.get('params', {}).get('mute') + evt = GroupMuteChangeEvent(host=host, group=group_id, muted=muted) + elif msg.get('method') == 'Client.OnConnect': + client = msg.get('params', {}).get('client') + evt = ClientConnectedEvent(host=host, client=client) + elif msg.get('method') == 'Client.OnDisconnect': + client = msg.get('params', {}).get('client') + evt = ClientDisconnectedEvent(host=host, client=client) + elif msg.get('method') == 'Client.OnLatencyChanged': + client = msg.get('params', {}).get('id') + latency = msg.get('params', {}).get('latency') + evt = ClientLatencyChangeEvent(host=host, client=client, latency=latency) + elif msg.get('method') == 'Client.OnNameChanged': + client = msg.get('params', {}).get('id') + name = msg.get('params', {}).get('name') + evt = ClientNameChangeEvent(host=host, client=client, name=name) + elif msg.get('method') == 'Group.OnStreamChanged': + group_id = msg.get('params', {}).get('id') + stream_id = msg.get('params', {}).get('stream_id') + evt = GroupStreamChangeEvent(host=host, group=group, stream=stream) + elif msg.get('method') == 'Stream.OnUpdate': + stream_id = msg.get('params', {}).get('stream_id') + stream = msg.get('params', {}).get('stream') + evt = StreamUpdateEvent(host=host, stream_id=stream_id, stream=stream) + elif msg.get('method') == 'Server.OnUpdate': + server = msg.get('params', {}).get('server') + evt = ServerUpdateEvent(host=host, server=server) + + return evt + + def _client(self, host, port): + def _thread(): + status = self._status(host, port) + + while not self.should_stop(): + try: + sock = self._connect(host, port) + msgs = self._recv(sock) + + if not isinstance(msgs, list): + msgs = [msgs] + + for msg in msgs: + self.logger.debug('Received message on {}:{}: {}'.format( + host, port, msg)) + + evt = self._parse_msg(host=host, msg=msg) + if evt: + self.bus.post(evt) + except Exception as e: + self.logger.warning(('Exception while getting the status ' + + 'of the Snapcast server {}:{}: {}'). + format(host, port, str(e))) + + try: + self._disconnect(host, port) + time.sleep(5) + except: + pass + + return _thread + + @classmethod + def _get_req_id(cls): + return get_plugin('music.snapcast')._get_req_id() + + def _status(self, host, port): + sock = self._connect(host, port) + + request = { + 'id': self._get_req_id(), + 'jsonrpc':'2.0', + 'method':'Server.GetStatus' + } + + get_plugin('music.snapcast')._send(sock, request) + return self._recv(sock).get('result', {}).get('server', {}) + + def run(self): + super().run() + + self.logger.info('Initialized Snapcast backend - hosts: {} ports: {}'. + format(self.hosts, self.ports)) + + while not self.should_stop(): + for i, host in enumerate(self.hosts): + port = self.ports[i] + self._threads[host] = threading.Thread( + target=self._client(host, port)) + + self._threads[host].start() + + for host in self.hosts: + self._threads[host].join() + + +# vim:sw=4:ts=4:et: diff --git a/platypush/message/event/music/snapcast.py b/platypush/message/event/music/snapcast.py new file mode 100644 index 00000000..d6f51d17 --- /dev/null +++ b/platypush/message/event/music/snapcast.py @@ -0,0 +1,95 @@ +from platypush.message.event import Event + + +class SnapcastEvent(Event): + """ Base class for Snapcast events """ + + def __init__(self, host='localhost', *args, **kwargs): + super().__init__(host=host, *args, **kwargs) + + +class ClientConnectedEvent(SnapcastEvent): + """ + Event fired upon client connection + """ + + def __init__(self, client, host='localhost', *args, **kwargs): + super().__init__(client=client, host=host, *args, **kwargs) + + +class ClientDisconnectedEvent(SnapcastEvent): + """ + Event fired upon client disconnection + """ + + def __init__(self, client, host='localhost', *args, **kwargs): + super().__init__(client=client, host=host, *args, **kwargs) + + +class ClientVolumeChangeEvent(SnapcastEvent): + """ + Event fired upon volume change or mute status change on a client + """ + + def __init__(self, client, volume, muted, host='localhost', *args, **kwargs): + super().__init__(client=client, host=host, volume=volume, + muted=muted, *args, **kwargs) + + +class ClientLatencyChangeEvent(SnapcastEvent): + """ + Event fired upon latency change on a client + """ + + def __init__(self, client, latency, host='localhost', *args, **kwargs): + super().__init__(client=client, host=host, latency=latency, + *args, **kwargs) + + +class ClientNameChangeEvent(SnapcastEvent): + """ + Event fired upon name change of a client + """ + + def __init__(self, client, name, host='localhost', *args, **kwargs): + super().__init__(client=client, host=host, name=name, + *args, **kwargs) + + +class GroupMuteChangeEvent(SnapcastEvent): + """ + Event fired upon mute status change + """ + + def __init__(self, group, muted, host='localhost', *args, **kwargs): + super().__init__(group=group, host=host, muted=muted, *args, **kwargs) + + +class GroupStreamChangeEvent(SnapcastEvent): + """ + Event fired upon group stream change + """ + + def __init__(self, group, stream, host='localhost', *args, **kwargs): + super().__init__(group=group, host=host, stream=stream, *args, **kwargs) + + +class StreamUpdateEvent(SnapcastEvent): + """ + Event fired upon stream update + """ + + def __init__(self, stream_id, stream, host='localhost', *args, **kwargs): + super().__init__(stream_id=stream_id, stream=stream, host=host, *args, **kwargs) + + +class ServerUpdateEvent(SnapcastEvent): + """ + Event fired upon stream update + """ + + def __init__(self, server, host='localhost', *args, **kwargs): + super().__init__(server=server, host=host, *args, **kwargs) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/music/snapcast.py b/platypush/plugins/music/snapcast.py index 6958f1f7..6ccdf775 100644 --- a/platypush/plugins/music/snapcast.py +++ b/platypush/plugins/music/snapcast.py @@ -70,6 +70,7 @@ class MusicSnapcastPlugin(Plugin): for c in clients: if client == c.get('id') or \ + client == c.get('name') or \ client == c.get('host', {}).get('name') or \ client == c.get('host', {}).get('ip'): return g @@ -97,9 +98,9 @@ class MusicSnapcastPlugin(Plugin): return self._recv(sock).get('server', {}) @action - def status(self, host=None, port=None): + def status(self, host=None, port=None, client=None, group=None): """ - Get the status either of a Snapcast server + Get the status either of a Snapcast server, client or group :param host: Snapcast server to query (default: default configured host) :type host: str @@ -107,6 +108,12 @@ class MusicSnapcastPlugin(Plugin): :param port: Snapcast server port (default: default configured port) :type port: int + :param client: Client ID or name (default: None) + :type client: str + + :param group: Group ID or name (default: None) + :type group: str + :returns: dict. Example: .. codeblock:: json @@ -195,6 +202,11 @@ class MusicSnapcastPlugin(Plugin): try: sock = self._connect(host or self.host, port or self.port) + if client: + return self._get_client(sock, client) + if group: + return self._get_group(sock, group) + return self._status(sock) finally: try: sock.close() @@ -478,4 +490,3 @@ class MusicSnapcastPlugin(Plugin): # vim:sw=4:ts=4:et: -