platypush/platypush/backend/music/snapcast.py
2019-01-06 19:19:30 +01:00

196 lines
7.3 KiB
Python

import json
import socket
import threading
import time
from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.message.event.music.snapcast import ClientVolumeChangeEvent, \
GroupMuteChangeEvent, ClientConnectedEvent, ClientDisconnectedEvent, \
ClientLatencyChangeEvent, ClientNameChangeEvent, GroupStreamChangeEvent, \
StreamUpdateEvent, ServerUpdateEvent
class MusicSnapcastBackend(Backend):
"""
Backend that listens for notification and status changes on one or more
[Snapcast](https://github.com/badaix/snapcast) servers.
Triggers:
* :class:`platypush.message.event.music.snapcast.ClientConnectedEvent`
* :class:`platypush.message.event.music.snapcast.ClientDisconnectedEvent`
* :class:`platypush.message.event.music.snapcast.ClientVolumeChangeEvent`
* :class:`platypush.message.event.music.snapcast.ClientLatencyChangeEvent`
* :class:`platypush.message.event.music.snapcast.ClientNameChangeEvent`
* :class:`platypush.message.event.music.snapcast.GroupMuteChangeEvent`
* :class:`platypush.message.event.music.snapcast.GroupStreamChangeEvent`
* :class:`platypush.message.event.music.snapcast.StreamUpdateEvent`
* :class:`platypush.message.event.music.snapcast.ServerUpdateEvent`
"""
_DEFAULT_SNAPCAST_PORT = 1705
_SOCKET_EOL = '\r\n'.encode()
def __init__(self, hosts=['localhost'], ports=[_DEFAULT_SNAPCAST_PORT],
*args, **kwargs):
"""
:param hosts: List of Snapcast server names or IPs to monitor (default:
`['localhost']`
:type hosts: list[str]
:param ports: List of control ports for the configured Snapcast servers
(default: `[1705]`)
:type ports: list[int]
"""
super().__init__(*args, **kwargs)
self.hosts = hosts[:]
self.ports = ports[:]
self._socks = {}
self._threads = {}
self._statuses = {}
if len(hosts) > len(ports):
for _ in range(len(ports), len(hosts)):
self.ports.append(self._DEFAULT_SNAPCAST_PORT)
def _connect(self, host, port):
if host in self._socks:
return self._socks[host]
self.logger.debug('Connecting to {}:{}'.format(host, port))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
self._socks[host] = sock
self.logger.info('Connected to {}:{}'.format(host, port))
return sock
def _disconnect(self, host, port):
sock = self._socks.get(host)
if not sock:
self.logger.debug('Not connected to {}:{}'.format(host, port))
return
try: sock.close()
except: pass
finally: self._socks[host] = None
@classmethod
def _recv(cls, sock):
buf = b''
while buf[-2:] != cls._SOCKET_EOL:
buf += sock.recv(1)
return json.loads(buf.decode().strip())
@classmethod
def _parse_msg(cls, host, msg):
evt = None
if msg.get('method') == 'Client.OnVolumeChanged':
client_id = msg.get('params', {}).get('id')
volume = msg.get('params', {}).get('volume', {}).get('percent')
muted = msg.get('params', {}).get('volume', {}).get('muted')
evt = ClientVolumeChangeEvent(host=host, client=client_id,
volume=volume, muted=muted)
elif msg.get('method') == 'Group.OnMute':
group_id = msg.get('params', {}).get('id')
muted = msg.get('params', {}).get('mute')
evt = GroupMuteChangeEvent(host=host, group=group_id, muted=muted)
elif msg.get('method') == 'Client.OnConnect':
client = msg.get('params', {}).get('client')
evt = ClientConnectedEvent(host=host, client=client)
elif msg.get('method') == 'Client.OnDisconnect':
client = msg.get('params', {}).get('client')
evt = ClientDisconnectedEvent(host=host, client=client)
elif msg.get('method') == 'Client.OnLatencyChanged':
client = msg.get('params', {}).get('id')
latency = msg.get('params', {}).get('latency')
evt = ClientLatencyChangeEvent(host=host, client=client, latency=latency)
elif msg.get('method') == 'Client.OnNameChanged':
client = msg.get('params', {}).get('id')
name = msg.get('params', {}).get('name')
evt = ClientNameChangeEvent(host=host, client=client, name=name)
elif msg.get('method') == 'Group.OnStreamChanged':
group_id = msg.get('params', {}).get('id')
stream_id = msg.get('params', {}).get('stream_id')
evt = GroupStreamChangeEvent(host=host, group=group, stream=stream)
elif msg.get('method') == 'Stream.OnUpdate':
stream_id = msg.get('params', {}).get('stream_id')
stream = msg.get('params', {}).get('stream')
evt = StreamUpdateEvent(host=host, stream_id=stream_id, stream=stream)
elif msg.get('method') == 'Server.OnUpdate':
server = msg.get('params', {}).get('server')
evt = ServerUpdateEvent(host=host, server=server)
return evt
def _client(self, host, port):
def _thread():
status = self._status(host, port)
while not self.should_stop():
try:
sock = self._connect(host, port)
msgs = self._recv(sock)
if not isinstance(msgs, list):
msgs = [msgs]
for msg in msgs:
self.logger.debug('Received message on {}:{}: {}'.format(
host, port, msg))
evt = self._parse_msg(host=host, msg=msg)
if evt:
self.bus.post(evt)
except Exception as e:
self.logger.warning(('Exception while getting the status ' +
'of the Snapcast server {}:{}: {}').
format(host, port, str(e)))
try:
self._disconnect(host, port)
time.sleep(5)
except:
pass
return _thread
@classmethod
def _get_req_id(cls):
return get_plugin('music.snapcast')._get_req_id()
def _status(self, host, port):
sock = self._connect(host, port)
request = {
'id': self._get_req_id(),
'jsonrpc':'2.0',
'method':'Server.GetStatus'
}
get_plugin('music.snapcast')._send(sock, request)
return self._recv(sock).get('result', {}).get('server', {})
def run(self):
super().run()
self.logger.info('Initialized Snapcast backend - hosts: {} ports: {}'.
format(self.hosts, self.ports))
while not self.should_stop():
for i, host in enumerate(self.hosts):
port = self.ports[i]
self._threads[host] = threading.Thread(
target=self._client(host, port))
self._threads[host].start()
for host in self.hosts:
self._threads[host].join()
# vim:sw=4:ts=4:et: