[snapcast] Merged backend logic into the plugin.

Closes: #283
This commit is contained in:
Fabio Manganiello 2023-11-10 02:48:07 +01:00
parent 6c3edb73f9
commit ceae310901
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
5 changed files with 382 additions and 343 deletions

View file

@ -103,5 +103,9 @@ export default {
cursor: pointer; cursor: pointer;
} }
} }
.slider-container {
padding-right: 1em;
}
} }
</style> </style>

View file

@ -78,7 +78,13 @@ export default {
methods: { methods: {
parseServerStatus(status) { parseServerStatus(status) {
status.server.host.port = this.ports[status.server.host.name] status.server = status.server || {
host: status.server.host || {
name: status.host
}
}
status.server.host.port = this.ports[status.host]
this.hosts[status.server.host.name] = { this.hosts[status.server.host.name] = {
...status, ...status,
groups: status.groups.map((group) => { groups: status.groups.map((group) => {
@ -105,14 +111,11 @@ export default {
this.loading = true this.loading = true
try { try {
const hosts = await this.request('music.snapcast.get_backend_hosts') const statuses = await this.request('music.snapcast.status')
const statuses = await Promise.all(Object.keys(hosts).map(
async (host) => this.request('music.snapcast.status', {host: host, port: hosts[host]})
))
this.hosts = {} this.hosts = {}
statuses.forEach((status) => { statuses.forEach((status) => {
this.ports[status.server.host.name] = hosts[status.server.host.name] this.ports[status.host] = status.port
this.parseServerStatus(status) this.parseServerStatus(status)
}) })
} finally { } finally {
@ -497,4 +500,18 @@ export default {
} }
} }
} }
:deep(.modal-container) {
.modal {
.content {
@include until($tablet) {
width: 95vw;
}
@include from($tablet) {
min-width: 600px;
}
}
}
}
</style> </style>

View file

@ -1,227 +0,0 @@
import json
import select
import socket
import threading
import time
from platypush.backend import Backend
from platypush.message.event.music.snapcast import (
ClientVolumeChangeEvent,
GroupMuteChangeEvent,
ClientConnectedEvent,
ClientDisconnectedEvent,
ClientLatencyChangeEvent,
ClientNameChangeEvent,
GroupStreamChangeEvent,
StreamUpdateEvent,
ServerUpdateEvent,
)
class MusicSnapcastBackend(Backend):
"""
Backend that listens for notification and status changes on one or more
`Snapcast <https://github.com/badaix/snapcast>`_ servers.
"""
_DEFAULT_SNAPCAST_PORT = 1705
_DEFAULT_POLL_SECONDS = 10 # Poll servers each 10 seconds
_SOCKET_EOL = '\r\n'.encode()
def __init__(
self,
hosts=None,
ports=None,
poll_seconds=_DEFAULT_POLL_SECONDS,
*args,
**kwargs,
):
"""
:param hosts: List of Snapcast server names or IPs to monitor (default:
`['localhost']`
:type hosts: list[str]
:param ports: List of control ports for the configured Snapcast servers
(default: `[1705]`)
:type ports: list[int]
:param poll_seconds: How often the backend will poll remote servers for
status updated (default: 10 seconds)
:type poll_seconds: float
"""
super().__init__(*args, **kwargs)
if hosts is None:
hosts = ['localhost']
if ports is None:
ports = [self._DEFAULT_SNAPCAST_PORT]
self.hosts = hosts[:]
self.ports = ports[:]
self.poll_seconds = poll_seconds
self._socks = {}
self._threads = {}
self._statuses = {}
if len(hosts) > len(ports):
for _ in range(len(ports), len(hosts)):
self.ports.append(self._DEFAULT_SNAPCAST_PORT)
def _connect(self, host, port):
if self._socks.get(host):
return self._socks[host]
self.logger.debug('Connecting to %s:%d', host, port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
self._socks[host] = sock
self.logger.info('Connected to %s:%d', host, port)
return sock
def _disconnect(self, host, port):
sock = self._socks.get(host)
if not sock:
self.logger.debug('Not connected to %s:%d', host, port)
return
try:
sock.close()
except Exception as e:
self.logger.warning(
'Exception while disconnecting from %s:%d: %s', host, port, e
)
finally:
self._socks[host] = None
@classmethod
def _recv(cls, sock):
sock.setblocking(0)
buf = b''
while buf[-2:] != cls._SOCKET_EOL:
ready = select.select([sock], [], [], 0.5)
if ready[0]:
buf += sock.recv(1)
else:
return None
return json.loads(buf.decode().strip())
@classmethod
def _parse_msg(cls, host, msg):
evt = None
if msg.get('method') == 'Client.OnVolumeChanged':
client_id = msg.get('params', {}).get('id')
volume = msg.get('params', {}).get('volume', {}).get('percent')
muted = msg.get('params', {}).get('volume', {}).get('muted')
evt = ClientVolumeChangeEvent(
host=host, client=client_id, volume=volume, muted=muted
)
elif msg.get('method') == 'Group.OnMute':
group_id = msg.get('params', {}).get('id')
muted = msg.get('params', {}).get('mute')
evt = GroupMuteChangeEvent(host=host, group=group_id, muted=muted)
elif msg.get('method') == 'Client.OnConnect':
client = msg.get('params', {}).get('client')
evt = ClientConnectedEvent(host=host, client=client)
elif msg.get('method') == 'Client.OnDisconnect':
client = msg.get('params', {}).get('client')
evt = ClientDisconnectedEvent(host=host, client=client)
elif msg.get('method') == 'Client.OnLatencyChanged':
client = msg.get('params', {}).get('id')
latency = msg.get('params', {}).get('latency')
evt = ClientLatencyChangeEvent(host=host, client=client, latency=latency)
elif msg.get('method') == 'Client.OnNameChanged':
client = msg.get('params', {}).get('id')
name = msg.get('params', {}).get('name')
evt = ClientNameChangeEvent(host=host, client=client, name=name)
elif msg.get('method') == 'Group.OnStreamChanged':
group_id = msg.get('params', {}).get('id')
stream_id = msg.get('params', {}).get('stream_id')
evt = GroupStreamChangeEvent(host=host, group=group_id, stream=stream_id)
elif msg.get('method') == 'Stream.OnUpdate':
stream_id = msg.get('params', {}).get('stream_id')
stream = msg.get('params', {}).get('stream')
evt = StreamUpdateEvent(host=host, stream_id=stream_id, stream=stream)
elif msg.get('method') == 'Server.OnUpdate':
server = msg.get('params', {}).get('server')
evt = ServerUpdateEvent(host=host, server=server)
return evt
def _client(self, host, port):
def _thread():
while not self.should_stop():
try:
sock = self._connect(host, port)
msgs = self._recv(sock)
if msgs is None:
continue
if not isinstance(msgs, list):
msgs = [msgs]
for msg in msgs:
self.logger.debug(
'Received message on {host}:{port}: {msg}'.format(
host=host, port=port, msg=msg
)
)
evt = self._parse_msg(host=host, msg=msg)
if evt:
self.bus.post(evt)
except Exception as e:
self.logger.warning(
'Exception while getting the status '
+ 'of the Snapcast server {}:{}: {}'.format(host, port, str(e))
)
self._disconnect(host, port)
finally:
time.sleep(self.poll_seconds)
return _thread
def run(self):
super().run()
self.logger.info(
'Initialized Snapcast backend - hosts: {} ports: {}'.format(
self.hosts, self.ports
)
)
while not self.should_stop():
for i, host in enumerate(self.hosts):
port = self.ports[i]
thread_name = f'Snapcast-{host}-{port}'
self._threads[host] = threading.Thread(
target=self._client(host, port), name=thread_name
)
self._threads[host].start()
for host in self.hosts:
self._threads[host].join()
self.logger.info('Snapcast backend terminated')
def on_stop(self):
self.logger.info('Received STOP event on the Snapcast backend')
for host, sock in self._socks.items():
if sock:
try:
sock.close()
except Exception as e:
self.logger.warning(
'Could not close Snapcast connection to {}: {}: {}'.format(
host, type(e), str(e)
)
)
# vim:sw=4:ts=4:et:

View file

@ -1,15 +0,0 @@
manifest:
events:
platypush.message.event.music.snapcast.ClientConnectedEvent: ''
platypush.message.event.music.snapcast.ClientDisconnectedEvent: ''
platypush.message.event.music.snapcast.ClientLatencyChangeEvent: ''
platypush.message.event.music.snapcast.ClientNameChangeEvent: ''
platypush.message.event.music.snapcast.ClientVolumeChangeEvent: ''
platypush.message.event.music.snapcast.GroupMuteChangeEvent: ''
platypush.message.event.music.snapcast.GroupStreamChangeEvent: ''
platypush.message.event.music.snapcast.ServerUpdateEvent: ''
platypush.message.event.music.snapcast.StreamUpdateEvent: ''
install:
pip: []
package: platypush.backend.music.snapcast
type: backend

View file

@ -1,14 +1,28 @@
import json import json
import select
import socket import socket
import threading import threading
from typing import Collection, Optional, Union import time
from concurrent.futures import ThreadPoolExecutor
from typing import Collection, Iterable, List, Optional, Tuple, Union
from platypush.config import Config from platypush.config import Config
from platypush.context import get_backend from platypush.context import get_bus
from platypush.plugins import Plugin, action from platypush.message.event.music.snapcast import (
ClientVolumeChangeEvent,
GroupMuteChangeEvent,
ClientConnectedEvent,
ClientDisconnectedEvent,
ClientLatencyChangeEvent,
ClientNameChangeEvent,
GroupStreamChangeEvent,
StreamUpdateEvent,
ServerUpdateEvent,
)
from platypush.plugins import RunnablePlugin, action
class MusicSnapcastPlugin(Plugin): class MusicSnapcastPlugin(RunnablePlugin):
""" """
Plugin to interact with a `Snapcast <https://github.com/badaix/snapcast>`_ Plugin to interact with a `Snapcast <https://github.com/badaix/snapcast>`_
instance, control clients mute status, volume, playback etc. instance, control clients mute status, volume, playback etc.
@ -18,68 +32,241 @@ class MusicSnapcastPlugin(Plugin):
""" """
_DEFAULT_SNAPCAST_PORT = 1705 _DEFAULT_SNAPCAST_PORT = 1705
_DEFAULT_POLL_SECONDS = 10 # Poll servers each 10 seconds
_SOCKET_EOL = '\r\n'.encode() _SOCKET_EOL = '\r\n'.encode()
def __init__( def __init__(
self, host: str = 'localhost', port: int = _DEFAULT_SNAPCAST_PORT, **kwargs self,
host: Optional[str] = None,
port: int = _DEFAULT_SNAPCAST_PORT,
hosts: Optional[Iterable[dict]] = None,
poll_interval: Optional[float] = _DEFAULT_POLL_SECONDS,
**kwargs,
): ):
""" """
:param host: Default Snapcast server host (default: localhost) :param host: Default Snapcast server host.
:param port: Default Snapcast server control port (default: 1705) :param port: Default Snapcast server control port (default: 1705).
""" :param hosts: If specified, then the provided list of Snapcast servers
super().__init__(**kwargs) will be monitored, rather than just the one provided on ``host``.
This setting can be used either in conjunction with ``host`` (in
that case, if the ``host`` is not specified on a request then
``host`` will be used as a fallback), or on its own (in that case
requests with no host specified will target the first server in the
list). Note however that either ``host`` or ``hosts`` must be
provided. Format:
self.host = host .. code-block:: yaml
self.port = port
hosts:
- host: localhost
port: 1705 # Default port
- host: snapcast.example.com
port: 9999
:param poll_seconds: How often the plugin will poll remote servers for
status updates (default: 10 seconds).
"""
super().__init__(poll_interval=poll_interval, **kwargs)
self._hosts = self._get_hosts(host=host, port=port, hosts=hosts)
assert self._hosts, 'No Snapcast hosts specified'
self._latest_req_id = 0 self._latest_req_id = 0
self._latest_req_id_lock = threading.RLock() self._latest_req_id_lock = threading.RLock()
backend = get_backend('music.snapcast') self._socks = {}
backend_hosts = backend.hosts if backend else [self.host] self._threads = {}
backend_ports = backend.ports if backend else [self.port] self._statuses = {}
self.backend_hosts = backend_hosts
self.backend_ports = backend_ports
def _get_req_id(self): @property
def host(self) -> str:
return self._hosts[0][0]
@property
def port(self) -> int:
if not getattr(self, '_hosts', None):
return self._DEFAULT_SNAPCAST_PORT
return self._hosts[0][1]
def _get_hosts(
self,
host: Optional[str] = None,
port: Optional[int] = None,
hosts: Optional[Iterable[dict]] = None,
) -> List[Tuple[str, int]]:
ret = []
if hosts:
assert all(
isinstance(h, dict) and h.get('host') for h in hosts
), f'Expected a list of dicts with host and port keys, got: {hosts}'
ret.extend((h['host'], h.get('port', self.port)) for h in hosts)
if host and port:
ret.insert(0, (host, port))
return list(dict.fromkeys(ret))
def _next_req_id(self):
with self._latest_req_id_lock: with self._latest_req_id_lock:
self._latest_req_id += 1 self._latest_req_id += 1
return self._latest_req_id return self._latest_req_id
def _connect(self, host: Optional[str] = None, port: Optional[int] = None): def _connect(self, host: str, port: int, reuse: bool = False):
if reuse and self._socks.get(host) and self._socks[host].fileno() >= 0:
return self._socks[host]
self.logger.debug('Connecting to %s:%d', host, port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.logger.info('Connecting to Snapcast host %s:%d', host, port) sock.connect((host, port))
sock.connect((host or self.host, port or self.port))
if reuse:
self._socks[host] = sock
self.logger.info('Connected to %s:%d', host, port)
return sock return sock
def _disconnect(self, host: str, port: int):
sock = self._socks.get(host)
if not sock:
self.logger.debug('Not connected to %s:%d', host, port)
return
try:
sock.close()
except Exception as e:
self.logger.warning(
'Exception while disconnecting from %s:%d: %s', host, port, e
)
finally:
self._socks[host] = None
@classmethod @classmethod
def _send(cls, sock: socket.socket, req: Union[dict, str, bytes]): def _send(cls, sock: socket.socket, req: Union[dict, str, bytes]):
if isinstance(req, dict): if isinstance(req, dict):
req = json.dumps(req) req = json.dumps(req)
if isinstance(req, str): if isinstance(req, str):
req = req.encode() req = req.encode()
if isinstance(req, bytes):
sock.send(req + cls._SOCKET_EOL) assert isinstance(
else: req, bytes
raise RuntimeError( ), f'Unsupported type {type(req)} for Snapcast request'
f'Unsupported type {type(req)} for Snapcast request: {req}' sock.send(req + cls._SOCKET_EOL)
)
@classmethod @classmethod
def _recv(cls, sock): def _recv_result(cls, sock: socket.socket):
msg = cls._recv(sock)
if not msg:
return None
return msg.get('result')
@classmethod
def _recv(cls, sock: socket.socket):
sock.setblocking(False)
buf = b'' buf = b''
while buf[-2:] != cls._SOCKET_EOL: while buf[-2:] != cls._SOCKET_EOL:
buf += sock.recv(1) ready = select.select([sock], [], [], 0.5)
return json.loads(buf.decode().strip()).get('result') if ready[0]:
ch = sock.recv(1)
if not ch:
raise ConnectionError('Connection reset by peer')
buf += ch
else:
return None
return json.loads(buf)
@classmethod
def _parse_event(cls, host, msg):
evt = None
if msg.get('method') == 'Client.OnVolumeChanged':
client_id = msg.get('params', {}).get('id')
volume = msg.get('params', {}).get('volume', {}).get('percent')
muted = msg.get('params', {}).get('volume', {}).get('muted')
evt = ClientVolumeChangeEvent(
host=host, client=client_id, volume=volume, muted=muted
)
elif msg.get('method') == 'Group.OnMute':
group_id = msg.get('params', {}).get('id')
muted = msg.get('params', {}).get('mute')
evt = GroupMuteChangeEvent(host=host, group=group_id, muted=muted)
elif msg.get('method') == 'Client.OnConnect':
client = msg.get('params', {}).get('client')
evt = ClientConnectedEvent(host=host, client=client)
elif msg.get('method') == 'Client.OnDisconnect':
client = msg.get('params', {}).get('client')
evt = ClientDisconnectedEvent(host=host, client=client)
elif msg.get('method') == 'Client.OnLatencyChanged':
client = msg.get('params', {}).get('id')
latency = msg.get('params', {}).get('latency')
evt = ClientLatencyChangeEvent(host=host, client=client, latency=latency)
elif msg.get('method') == 'Client.OnNameChanged':
client = msg.get('params', {}).get('id')
name = msg.get('params', {}).get('name')
evt = ClientNameChangeEvent(host=host, client=client, name=name)
elif msg.get('method') == 'Group.OnStreamChanged':
group_id = msg.get('params', {}).get('id')
stream_id = msg.get('params', {}).get('stream_id')
evt = GroupStreamChangeEvent(host=host, group=group_id, stream=stream_id)
elif msg.get('method') == 'Stream.OnUpdate':
stream_id = msg.get('params', {}).get('stream_id')
stream = msg.get('params', {}).get('stream')
evt = StreamUpdateEvent(host=host, stream_id=stream_id, stream=stream)
elif msg.get('method') == 'Server.OnUpdate':
server = msg.get('params', {}).get('server')
evt = ServerUpdateEvent(host=host, server=server)
return evt
def _event_listener(self, host: str, port: int):
def _thread():
while not self.should_stop():
try:
sock = self._connect(host, port, reuse=True)
msgs = self._recv(sock)
if msgs is None:
continue
if not isinstance(msgs, list):
msgs = [msgs]
for msg in msgs:
self.logger.debug(
'Received message on %s:%d: %s', host, port, msg
)
evt = self._parse_event(host=host, msg=msg)
if evt:
get_bus().post(evt)
except Exception as e:
self.logger.warning(
'Exception while getting the status of the Snapcast server %s:%d: %s',
host,
port,
e,
)
self._disconnect(host, port)
finally:
if self.poll_interval:
time.sleep(self.poll_interval)
return _thread
def _get_group(self, sock: socket.socket, group: str): def _get_group(self, sock: socket.socket, group: str):
for g in self._status(sock).get('groups', []): for g in self._get_status(sock).get('groups', []):
if group == g.get('id') or group == g.get('name'): if group == g.get('id') or group == g.get('name'):
return g return g
return None return None
def _get_client(self, sock: socket.socket, client: str): def _get_client(self, sock: socket.socket, client: str):
for g in self._status(sock).get('groups', []): for g in self._get_status(sock).get('groups', []):
clients = g.get('clients', []) clients = g.get('clients', [])
for c in clients: for c in clients:
@ -94,15 +281,62 @@ class MusicSnapcastPlugin(Plugin):
return None return None
def _status(self, sock: socket.socket): def _get_status(self, sock: socket.socket) -> dict:
request = { request = {
'id': self._get_req_id(), 'id': self._next_req_id(),
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'method': 'Server.GetStatus', 'method': 'Server.GetStatus',
} }
self._send(sock, request) self._send(sock, request)
return (self._recv(sock) or {}).get('server', {}) return (self._recv_result(sock) or {}).get('server', {})
def _status(
self,
host: Optional[str] = None,
port: Optional[int] = None,
client: Optional[str] = None,
group: Optional[str] = None,
):
sock = None
try:
sock = self._connect(host or self.host, port or self.port)
if client:
return self._get_client(sock, client)
if group:
return self._get_group(sock, group)
return self._get_status(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close: %s', e)
def _status_response(
self,
host: str,
port: int,
client: Optional[str] = None,
group: Optional[str] = None,
):
sock = None
try:
sock = self._connect(host, port)
if client:
return self._get_client(sock, client)
if group:
return self._get_group(sock, group)
return {'host': host, 'port': port, **self._get_status(sock)}
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close: %s', e)
@action @action
def status( def status(
@ -113,7 +347,10 @@ class MusicSnapcastPlugin(Plugin):
group: Optional[str] = None, group: Optional[str] = None,
): ):
""" """
Get the status either of a Snapcast server, client or group Get the current status of a Snapcast server, client or group.
If not host, client or group is specified, the action will return the
status of all the Snapcast servers.
:param host: Snapcast server to query (default: default configured host) :param host: Snapcast server to query (default: default configured host)
:param port: Snapcast server port (default: default configured port) :param port: Snapcast server port (default: default configured port)
@ -125,6 +362,8 @@ class MusicSnapcastPlugin(Plugin):
.. code-block:: json .. code-block:: json
"output": { "output": {
"host": "localhost",
"port": 1705,
"groups": [ "groups": [
{ {
"clients": [ "clients": [
@ -205,22 +444,20 @@ class MusicSnapcastPlugin(Plugin):
""" """
sock = None if client or group or host:
return self._status_response(
host=host or self.host,
port=port or self.port,
client=client,
group=group,
)
try: # Run status in parallel on all the hosts and return a list with all the
sock = self._connect(host or self.host, port or self.port) # results
if client: with ThreadPoolExecutor(max_workers=len(self._hosts)) as executor:
return self._get_client(sock, client) return list(
if group: executor.map(lambda h: self._status_response(h[0], h[1]), self._hosts)
return self._get_group(sock, group) )
return self._status(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close: %s', e)
@action @action
def mute( def mute(
@ -242,15 +479,13 @@ class MusicSnapcastPlugin(Plugin):
:param port: Snapcast server port (default: default configured port) :param port: Snapcast server port (default: default configured port)
""" """
if not (client and group): assert client or group, 'Please specify either a client or a group'
raise RuntimeError('Please specify either a client or a group')
sock = None sock = None
try: try:
sock = self._connect(host or self.host, port or self.port) sock = self._connect(host or self.host, port or self.port)
request = { request = {
'id': self._get_req_id(), 'id': self._next_req_id(),
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'method': 'Group.SetMute' if group else 'Client.SetVolume', 'method': 'Group.SetMute' if group else 'Client.SetVolume',
'params': {}, 'params': {},
@ -276,7 +511,7 @@ class MusicSnapcastPlugin(Plugin):
) )
self._send(sock, request) self._send(sock, request)
return self._recv(sock) return self._recv_result(sock)
finally: finally:
try: try:
if sock: if sock:
@ -305,17 +540,17 @@ class MusicSnapcastPlugin(Plugin):
:param port: Snapcast server port (default: default configured port) :param port: Snapcast server port (default: default configured port)
""" """
if volume is None and delta is None and mute is None: assert not (volume is None and delta is None and mute is None), (
raise RuntimeError( 'Please specify either an absolute volume, a relative delta or '
'Please specify either an absolute volume or ' + 'relative delta' + 'a mute status'
) )
sock = None sock = None
try: try:
sock = self._connect(host or self.host, port or self.port) sock = self._connect(host or self.host, port or self.port)
request = { request = {
'id': self._get_req_id(), 'id': self._next_req_id(),
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'method': 'Client.SetVolume', 'method': 'Client.SetVolume',
'params': {}, 'params': {},
@ -340,7 +575,7 @@ class MusicSnapcastPlugin(Plugin):
request['params']['volume']['percent'] = volume request['params']['volume']['percent'] = volume
request['params']['volume']['muted'] = mute request['params']['volume']['muted'] = mute
self._send(sock, request) self._send(sock, request)
return self._recv(sock) return self._recv_result(sock)
finally: finally:
try: try:
if sock: if sock:
@ -370,7 +605,7 @@ class MusicSnapcastPlugin(Plugin):
try: try:
sock = self._connect(host or self.host, port or self.port) sock = self._connect(host or self.host, port or self.port)
request = { request = {
'id': self._get_req_id(), 'id': self._next_req_id(),
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'method': 'Client.SetName', 'method': 'Client.SetName',
'params': {}, 'params': {},
@ -381,7 +616,7 @@ class MusicSnapcastPlugin(Plugin):
request['params']['id'] = c['id'] request['params']['id'] = c['id']
request['params']['name'] = name request['params']['name'] = name
self._send(sock, request) self._send(sock, request)
return self._recv(sock) return self._recv_result(sock)
finally: finally:
try: try:
if sock: if sock:
@ -411,7 +646,7 @@ class MusicSnapcastPlugin(Plugin):
try: try:
sock = self._connect(host or self.host, port or self.port) sock = self._connect(host or self.host, port or self.port)
request = { request = {
'id': self._get_req_id(), 'id': self._next_req_id(),
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'method': 'Group.SetName', 'method': 'Group.SetName',
'params': { 'params': {
@ -421,7 +656,7 @@ class MusicSnapcastPlugin(Plugin):
} }
self._send(sock, request) self._send(sock, request)
return self._recv(sock) return self._recv_result(sock)
finally: finally:
try: try:
if sock: if sock:
@ -451,7 +686,7 @@ class MusicSnapcastPlugin(Plugin):
try: try:
sock = self._connect(host or self.host, port or self.port) sock = self._connect(host or self.host, port or self.port)
request = { request = {
'id': self._get_req_id(), 'id': self._next_req_id(),
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'method': 'Client.SetLatency', 'method': 'Client.SetLatency',
'params': {'latency': latency}, 'params': {'latency': latency},
@ -461,7 +696,7 @@ class MusicSnapcastPlugin(Plugin):
assert c, f'No such client: {client}' assert c, f'No such client: {client}'
request['params']['id'] = c['id'] request['params']['id'] = c['id']
self._send(sock, request) self._send(sock, request)
return self._recv(sock) return self._recv_result(sock)
finally: finally:
try: try:
if sock: if sock:
@ -486,7 +721,7 @@ class MusicSnapcastPlugin(Plugin):
try: try:
sock = self._connect(host or self.host, port or self.port) sock = self._connect(host or self.host, port or self.port)
request = { request = {
'id': self._get_req_id(), 'id': self._next_req_id(),
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'method': 'Server.DeleteClient', 'method': 'Server.DeleteClient',
'params': {}, 'params': {},
@ -496,7 +731,7 @@ class MusicSnapcastPlugin(Plugin):
assert c, f'No such client: {client}' assert c, f'No such client: {client}'
request['params']['id'] = c['id'] request['params']['id'] = c['id']
self._send(sock, request) self._send(sock, request)
return self._recv(sock) return self._recv_result(sock)
finally: finally:
try: try:
if sock: if sock:
@ -528,7 +763,7 @@ class MusicSnapcastPlugin(Plugin):
g = self._get_group(sock, group) g = self._get_group(sock, group)
assert g, f'No such group: {group}' assert g, f'No such group: {group}'
request = { request = {
'id': self._get_req_id(), 'id': self._next_req_id(),
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'method': 'Group.SetClients', 'method': 'Group.SetClients',
'params': {'id': g['id'], 'clients': []}, 'params': {'id': g['id'], 'clients': []},
@ -540,7 +775,7 @@ class MusicSnapcastPlugin(Plugin):
request['params']['clients'].append(c['id']) request['params']['clients'].append(c['id'])
self._send(sock, request) self._send(sock, request)
return self._recv(sock) return self._recv_result(sock)
finally: finally:
try: try:
if sock: if sock:
@ -572,7 +807,7 @@ class MusicSnapcastPlugin(Plugin):
g = self._get_group(sock, group) g = self._get_group(sock, group)
assert g, f'No such group: {group}' assert g, f'No such group: {group}'
request = { request = {
'id': self._get_req_id(), 'id': self._next_req_id(),
'jsonrpc': '2.0', 'jsonrpc': '2.0',
'method': 'Group.SetStream', 'method': 'Group.SetStream',
'params': { 'params': {
@ -582,7 +817,7 @@ class MusicSnapcastPlugin(Plugin):
} }
self._send(sock, request) self._send(sock, request)
return self._recv(sock) return self._recv_result(sock)
finally: finally:
try: try:
if sock: if sock:
@ -590,25 +825,19 @@ class MusicSnapcastPlugin(Plugin):
except Exception as e: except Exception as e:
self.logger.warning('Error on socket close: %s', e) self.logger.warning('Error on socket close: %s', e)
@action
def get_backend_hosts(self):
"""
:return: A dict with the Snapcast hosts configured on the backend
in the format ``host -> port``.
"""
return {
host: self.backend_ports[i] for i, host in enumerate(self.backend_hosts)
}
@action @action
def get_playing_streams(self, exclude_local: bool = False): def get_playing_streams(self, exclude_local: bool = False):
""" """
Returns the remote streams configured in the `music.snapcast` backend Returns the configured remote streams that are currently active and
that are currently active and unmuted. unmuted.
.. warning:: This action is deprecated and mostly kept for
backward-compatibility purposes, as it doesn't allow the case where
multiple Snapcast instances can be running on the same host, nor it
provides additional information other that something is playing on a
certain host and port. Use :meth:`.status` instead.
:param exclude_local: Exclude localhost connections (default: False) :param exclude_local: Exclude localhost connections (default: False)
:returns: dict with the host->port mapping. Example: :returns: dict with the host->port mapping. Example:
.. code-block:: json .. code-block:: json
@ -623,20 +852,20 @@ class MusicSnapcastPlugin(Plugin):
""" """
backend_hosts: dict = self.get_backend_hosts().output # type: ignore
playing_hosts = {} playing_hosts = {}
def _worker(host, port): def _worker(host: str, port: int):
try: try:
if exclude_local and ( if exclude_local and (
host == 'localhost' or host == Config.get('device_id') host == 'localhost' or host == Config.get('device_id')
): ):
return return
server_status: dict = self.status(host=host, port=port).output # type: ignore server_status = self._status(host=host, port=port) or {}
client_status: dict = self.status( # type: ignore device_id: str = Config.get('device_id') # type: ignore
host=host, port=port, client=Config.get('device_id') client_status = (
).output self._status(host=host, port=port, client=device_id) or {}
)
if client_status.get('config', {}).get('volume', {}).get('muted'): if client_status.get('config', {}).get('volume', {}).get('muted'):
return return
@ -674,7 +903,7 @@ class MusicSnapcastPlugin(Plugin):
workers = [] workers = []
for host, port in backend_hosts.items(): for host, port in self._hosts:
w = threading.Thread(target=_worker, args=(host, port)) w = threading.Thread(target=_worker, args=(host, port))
w.start() w.start()
workers.append(w) workers.append(w)
@ -685,5 +914,36 @@ class MusicSnapcastPlugin(Plugin):
return {'hosts': playing_hosts} return {'hosts': playing_hosts}
def main(self):
while not self.should_stop():
for host, port in self._hosts:
thread_name = f'Snapcast-{host}-{port}'
self._threads[host] = threading.Thread(
target=self._event_listener(host, port), name=thread_name
)
self._threads[host].start()
for thread in self._threads.values():
thread.join()
self._threads = {}
def stop(self):
for host, sock in self._socks.items():
if sock:
try:
sock.close()
except Exception as e:
self.logger.warning(
'Could not close Snapcast connection to %s: %s: %s',
host,
type(e),
e,
)
super().stop()
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et: