diff --git a/platypush/backend/http/webapp/src/components/panels/MusicSnapcast/Client.vue b/platypush/backend/http/webapp/src/components/panels/MusicSnapcast/Client.vue index b52534b903..7d46b6f14d 100644 --- a/platypush/backend/http/webapp/src/components/panels/MusicSnapcast/Client.vue +++ b/platypush/backend/http/webapp/src/components/panels/MusicSnapcast/Client.vue @@ -103,5 +103,9 @@ export default { cursor: pointer; } } + + .slider-container { + padding-right: 1em; + } } diff --git a/platypush/backend/http/webapp/src/components/panels/MusicSnapcast/Index.vue b/platypush/backend/http/webapp/src/components/panels/MusicSnapcast/Index.vue index 821eeff2fc..374d8625d5 100644 --- a/platypush/backend/http/webapp/src/components/panels/MusicSnapcast/Index.vue +++ b/platypush/backend/http/webapp/src/components/panels/MusicSnapcast/Index.vue @@ -78,7 +78,13 @@ export default { methods: { parseServerStatus(status) { - status.server.host.port = this.ports[status.server.host.name] + status.server = status.server || { + host: status.server.host || { + name: status.host + } + } + + status.server.host.port = this.ports[status.host] this.hosts[status.server.host.name] = { ...status, groups: status.groups.map((group) => { @@ -105,14 +111,11 @@ export default { this.loading = true try { - const hosts = await this.request('music.snapcast.get_backend_hosts') - const statuses = await Promise.all(Object.keys(hosts).map( - async (host) => this.request('music.snapcast.status', {host: host, port: hosts[host]}) - )) - + const statuses = await this.request('music.snapcast.status') this.hosts = {} + statuses.forEach((status) => { - this.ports[status.server.host.name] = hosts[status.server.host.name] + this.ports[status.host] = status.port this.parseServerStatus(status) }) } finally { @@ -497,4 +500,18 @@ export default { } } } + +:deep(.modal-container) { + .modal { + .content { + @include until($tablet) { + width: 95vw; + } + + @include from($tablet) { + min-width: 600px; + } + } + } +} diff --git a/platypush/backend/music/snapcast/__init__.py b/platypush/backend/music/snapcast/__init__.py deleted file mode 100644 index 509776e00b..0000000000 --- a/platypush/backend/music/snapcast/__init__.py +++ /dev/null @@ -1,227 +0,0 @@ -import json -import select -import socket -import threading -import time - -from platypush.backend import Backend -from platypush.message.event.music.snapcast import ( - ClientVolumeChangeEvent, - GroupMuteChangeEvent, - ClientConnectedEvent, - ClientDisconnectedEvent, - ClientLatencyChangeEvent, - ClientNameChangeEvent, - GroupStreamChangeEvent, - StreamUpdateEvent, - ServerUpdateEvent, -) - - -class MusicSnapcastBackend(Backend): - """ - Backend that listens for notification and status changes on one or more - `Snapcast `_ servers. - """ - - _DEFAULT_SNAPCAST_PORT = 1705 - _DEFAULT_POLL_SECONDS = 10 # Poll servers each 10 seconds - _SOCKET_EOL = '\r\n'.encode() - - def __init__( - self, - hosts=None, - ports=None, - poll_seconds=_DEFAULT_POLL_SECONDS, - *args, - **kwargs, - ): - """ - :param hosts: List of Snapcast server names or IPs to monitor (default: - `['localhost']` - :type hosts: list[str] - - :param ports: List of control ports for the configured Snapcast servers - (default: `[1705]`) - :type ports: list[int] - - :param poll_seconds: How often the backend will poll remote servers for - status updated (default: 10 seconds) - :type poll_seconds: float - """ - - super().__init__(*args, **kwargs) - - if hosts is None: - hosts = ['localhost'] - if ports is None: - ports = [self._DEFAULT_SNAPCAST_PORT] - - self.hosts = hosts[:] - self.ports = ports[:] - self.poll_seconds = poll_seconds - self._socks = {} - self._threads = {} - self._statuses = {} - - if len(hosts) > len(ports): - for _ in range(len(ports), len(hosts)): - self.ports.append(self._DEFAULT_SNAPCAST_PORT) - - def _connect(self, host, port): - if self._socks.get(host): - return self._socks[host] - - self.logger.debug('Connecting to %s:%d', host, port) - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((host, port)) - self._socks[host] = sock - self.logger.info('Connected to %s:%d', host, port) - return sock - - def _disconnect(self, host, port): - sock = self._socks.get(host) - if not sock: - self.logger.debug('Not connected to %s:%d', host, port) - return - - try: - sock.close() - except Exception as e: - self.logger.warning( - 'Exception while disconnecting from %s:%d: %s', host, port, e - ) - finally: - self._socks[host] = None - - @classmethod - def _recv(cls, sock): - sock.setblocking(0) - buf = b'' - - while buf[-2:] != cls._SOCKET_EOL: - ready = select.select([sock], [], [], 0.5) - if ready[0]: - buf += sock.recv(1) - else: - return None - - return json.loads(buf.decode().strip()) - - @classmethod - def _parse_msg(cls, host, msg): - evt = None - - if msg.get('method') == 'Client.OnVolumeChanged': - client_id = msg.get('params', {}).get('id') - volume = msg.get('params', {}).get('volume', {}).get('percent') - muted = msg.get('params', {}).get('volume', {}).get('muted') - evt = ClientVolumeChangeEvent( - host=host, client=client_id, volume=volume, muted=muted - ) - elif msg.get('method') == 'Group.OnMute': - group_id = msg.get('params', {}).get('id') - muted = msg.get('params', {}).get('mute') - evt = GroupMuteChangeEvent(host=host, group=group_id, muted=muted) - elif msg.get('method') == 'Client.OnConnect': - client = msg.get('params', {}).get('client') - evt = ClientConnectedEvent(host=host, client=client) - elif msg.get('method') == 'Client.OnDisconnect': - client = msg.get('params', {}).get('client') - evt = ClientDisconnectedEvent(host=host, client=client) - elif msg.get('method') == 'Client.OnLatencyChanged': - client = msg.get('params', {}).get('id') - latency = msg.get('params', {}).get('latency') - evt = ClientLatencyChangeEvent(host=host, client=client, latency=latency) - elif msg.get('method') == 'Client.OnNameChanged': - client = msg.get('params', {}).get('id') - name = msg.get('params', {}).get('name') - evt = ClientNameChangeEvent(host=host, client=client, name=name) - elif msg.get('method') == 'Group.OnStreamChanged': - group_id = msg.get('params', {}).get('id') - stream_id = msg.get('params', {}).get('stream_id') - evt = GroupStreamChangeEvent(host=host, group=group_id, stream=stream_id) - elif msg.get('method') == 'Stream.OnUpdate': - stream_id = msg.get('params', {}).get('stream_id') - stream = msg.get('params', {}).get('stream') - evt = StreamUpdateEvent(host=host, stream_id=stream_id, stream=stream) - elif msg.get('method') == 'Server.OnUpdate': - server = msg.get('params', {}).get('server') - evt = ServerUpdateEvent(host=host, server=server) - - return evt - - def _client(self, host, port): - def _thread(): - while not self.should_stop(): - try: - sock = self._connect(host, port) - msgs = self._recv(sock) - - if msgs is None: - continue - if not isinstance(msgs, list): - msgs = [msgs] - - for msg in msgs: - self.logger.debug( - 'Received message on {host}:{port}: {msg}'.format( - host=host, port=port, msg=msg - ) - ) - - evt = self._parse_msg(host=host, msg=msg) - if evt: - self.bus.post(evt) - except Exception as e: - self.logger.warning( - 'Exception while getting the status ' - + 'of the Snapcast server {}:{}: {}'.format(host, port, str(e)) - ) - - self._disconnect(host, port) - finally: - time.sleep(self.poll_seconds) - - return _thread - - def run(self): - super().run() - - self.logger.info( - 'Initialized Snapcast backend - hosts: {} ports: {}'.format( - self.hosts, self.ports - ) - ) - - while not self.should_stop(): - for i, host in enumerate(self.hosts): - port = self.ports[i] - thread_name = f'Snapcast-{host}-{port}' - - self._threads[host] = threading.Thread( - target=self._client(host, port), name=thread_name - ) - - self._threads[host].start() - - for host in self.hosts: - self._threads[host].join() - - self.logger.info('Snapcast backend terminated') - - def on_stop(self): - self.logger.info('Received STOP event on the Snapcast backend') - for host, sock in self._socks.items(): - if sock: - try: - sock.close() - except Exception as e: - self.logger.warning( - 'Could not close Snapcast connection to {}: {}: {}'.format( - host, type(e), str(e) - ) - ) - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/music/snapcast/manifest.yaml b/platypush/backend/music/snapcast/manifest.yaml deleted file mode 100644 index 509f12568c..0000000000 --- a/platypush/backend/music/snapcast/manifest.yaml +++ /dev/null @@ -1,15 +0,0 @@ -manifest: - events: - platypush.message.event.music.snapcast.ClientConnectedEvent: '' - platypush.message.event.music.snapcast.ClientDisconnectedEvent: '' - platypush.message.event.music.snapcast.ClientLatencyChangeEvent: '' - platypush.message.event.music.snapcast.ClientNameChangeEvent: '' - platypush.message.event.music.snapcast.ClientVolumeChangeEvent: '' - platypush.message.event.music.snapcast.GroupMuteChangeEvent: '' - platypush.message.event.music.snapcast.GroupStreamChangeEvent: '' - platypush.message.event.music.snapcast.ServerUpdateEvent: '' - platypush.message.event.music.snapcast.StreamUpdateEvent: '' - install: - pip: [] - package: platypush.backend.music.snapcast - type: backend diff --git a/platypush/plugins/music/snapcast/__init__.py b/platypush/plugins/music/snapcast/__init__.py index 5a22f7645d..d30cc3ce14 100644 --- a/platypush/plugins/music/snapcast/__init__.py +++ b/platypush/plugins/music/snapcast/__init__.py @@ -1,14 +1,28 @@ import json +import select import socket import threading -from typing import Collection, Optional, Union +import time +from concurrent.futures import ThreadPoolExecutor +from typing import Collection, Iterable, List, Optional, Tuple, Union from platypush.config import Config -from platypush.context import get_backend -from platypush.plugins import Plugin, action +from platypush.context import get_bus +from platypush.message.event.music.snapcast import ( + ClientVolumeChangeEvent, + GroupMuteChangeEvent, + ClientConnectedEvent, + ClientDisconnectedEvent, + ClientLatencyChangeEvent, + ClientNameChangeEvent, + GroupStreamChangeEvent, + StreamUpdateEvent, + ServerUpdateEvent, +) +from platypush.plugins import RunnablePlugin, action -class MusicSnapcastPlugin(Plugin): +class MusicSnapcastPlugin(RunnablePlugin): """ Plugin to interact with a `Snapcast `_ instance, control clients mute status, volume, playback etc. @@ -18,68 +32,241 @@ class MusicSnapcastPlugin(Plugin): """ _DEFAULT_SNAPCAST_PORT = 1705 + _DEFAULT_POLL_SECONDS = 10 # Poll servers each 10 seconds _SOCKET_EOL = '\r\n'.encode() def __init__( - self, host: str = 'localhost', port: int = _DEFAULT_SNAPCAST_PORT, **kwargs + self, + host: Optional[str] = None, + port: int = _DEFAULT_SNAPCAST_PORT, + hosts: Optional[Iterable[dict]] = None, + poll_interval: Optional[float] = _DEFAULT_POLL_SECONDS, + **kwargs, ): """ - :param host: Default Snapcast server host (default: localhost) - :param port: Default Snapcast server control port (default: 1705) - """ - super().__init__(**kwargs) + :param host: Default Snapcast server host. + :param port: Default Snapcast server control port (default: 1705). + :param hosts: If specified, then the provided list of Snapcast servers + will be monitored, rather than just the one provided on ``host``. + This setting can be used either in conjunction with ``host`` (in + that case, if the ``host`` is not specified on a request then + ``host`` will be used as a fallback), or on its own (in that case + requests with no host specified will target the first server in the + list). Note however that either ``host`` or ``hosts`` must be + provided. Format: - self.host = host - self.port = port + .. code-block:: yaml + + hosts: + - host: localhost + port: 1705 # Default port + + - host: snapcast.example.com + port: 9999 + + :param poll_seconds: How often the plugin will poll remote servers for + status updates (default: 10 seconds). + """ + super().__init__(poll_interval=poll_interval, **kwargs) + + self._hosts = self._get_hosts(host=host, port=port, hosts=hosts) + assert self._hosts, 'No Snapcast hosts specified' self._latest_req_id = 0 self._latest_req_id_lock = threading.RLock() - backend = get_backend('music.snapcast') - backend_hosts = backend.hosts if backend else [self.host] - backend_ports = backend.ports if backend else [self.port] - self.backend_hosts = backend_hosts - self.backend_ports = backend_ports + self._socks = {} + self._threads = {} + self._statuses = {} - def _get_req_id(self): + @property + def host(self) -> str: + return self._hosts[0][0] + + @property + def port(self) -> int: + if not getattr(self, '_hosts', None): + return self._DEFAULT_SNAPCAST_PORT + + return self._hosts[0][1] + + def _get_hosts( + self, + host: Optional[str] = None, + port: Optional[int] = None, + hosts: Optional[Iterable[dict]] = None, + ) -> List[Tuple[str, int]]: + ret = [] + if hosts: + assert all( + isinstance(h, dict) and h.get('host') for h in hosts + ), f'Expected a list of dicts with host and port keys, got: {hosts}' + + ret.extend((h['host'], h.get('port', self.port)) for h in hosts) + + if host and port: + ret.insert(0, (host, port)) + + return list(dict.fromkeys(ret)) + + def _next_req_id(self): with self._latest_req_id_lock: self._latest_req_id += 1 return self._latest_req_id - def _connect(self, host: Optional[str] = None, port: Optional[int] = None): + def _connect(self, host: str, port: int, reuse: bool = False): + if reuse and self._socks.get(host) and self._socks[host].fileno() >= 0: + return self._socks[host] + + self.logger.debug('Connecting to %s:%d', host, port) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.logger.info('Connecting to Snapcast host %s:%d', host, port) - sock.connect((host or self.host, port or self.port)) + sock.connect((host, port)) + + if reuse: + self._socks[host] = sock + + self.logger.info('Connected to %s:%d', host, port) return sock + def _disconnect(self, host: str, port: int): + sock = self._socks.get(host) + if not sock: + self.logger.debug('Not connected to %s:%d', host, port) + return + + try: + sock.close() + except Exception as e: + self.logger.warning( + 'Exception while disconnecting from %s:%d: %s', host, port, e + ) + finally: + self._socks[host] = None + @classmethod def _send(cls, sock: socket.socket, req: Union[dict, str, bytes]): if isinstance(req, dict): req = json.dumps(req) if isinstance(req, str): req = req.encode() - if isinstance(req, bytes): - sock.send(req + cls._SOCKET_EOL) - else: - raise RuntimeError( - f'Unsupported type {type(req)} for Snapcast request: {req}' - ) + + assert isinstance( + req, bytes + ), f'Unsupported type {type(req)} for Snapcast request' + sock.send(req + cls._SOCKET_EOL) @classmethod - def _recv(cls, sock): + def _recv_result(cls, sock: socket.socket): + msg = cls._recv(sock) + if not msg: + return None + + return msg.get('result') + + @classmethod + def _recv(cls, sock: socket.socket): + sock.setblocking(False) buf = b'' + while buf[-2:] != cls._SOCKET_EOL: - buf += sock.recv(1) - return json.loads(buf.decode().strip()).get('result') + ready = select.select([sock], [], [], 0.5) + if ready[0]: + ch = sock.recv(1) + if not ch: + raise ConnectionError('Connection reset by peer') + + buf += ch + else: + return None + + return json.loads(buf) + + @classmethod + def _parse_event(cls, host, msg): + evt = None + + if msg.get('method') == 'Client.OnVolumeChanged': + client_id = msg.get('params', {}).get('id') + volume = msg.get('params', {}).get('volume', {}).get('percent') + muted = msg.get('params', {}).get('volume', {}).get('muted') + evt = ClientVolumeChangeEvent( + host=host, client=client_id, volume=volume, muted=muted + ) + elif msg.get('method') == 'Group.OnMute': + group_id = msg.get('params', {}).get('id') + muted = msg.get('params', {}).get('mute') + evt = GroupMuteChangeEvent(host=host, group=group_id, muted=muted) + elif msg.get('method') == 'Client.OnConnect': + client = msg.get('params', {}).get('client') + evt = ClientConnectedEvent(host=host, client=client) + elif msg.get('method') == 'Client.OnDisconnect': + client = msg.get('params', {}).get('client') + evt = ClientDisconnectedEvent(host=host, client=client) + elif msg.get('method') == 'Client.OnLatencyChanged': + client = msg.get('params', {}).get('id') + latency = msg.get('params', {}).get('latency') + evt = ClientLatencyChangeEvent(host=host, client=client, latency=latency) + elif msg.get('method') == 'Client.OnNameChanged': + client = msg.get('params', {}).get('id') + name = msg.get('params', {}).get('name') + evt = ClientNameChangeEvent(host=host, client=client, name=name) + elif msg.get('method') == 'Group.OnStreamChanged': + group_id = msg.get('params', {}).get('id') + stream_id = msg.get('params', {}).get('stream_id') + evt = GroupStreamChangeEvent(host=host, group=group_id, stream=stream_id) + elif msg.get('method') == 'Stream.OnUpdate': + stream_id = msg.get('params', {}).get('stream_id') + stream = msg.get('params', {}).get('stream') + evt = StreamUpdateEvent(host=host, stream_id=stream_id, stream=stream) + elif msg.get('method') == 'Server.OnUpdate': + server = msg.get('params', {}).get('server') + evt = ServerUpdateEvent(host=host, server=server) + + return evt + + def _event_listener(self, host: str, port: int): + def _thread(): + while not self.should_stop(): + try: + sock = self._connect(host, port, reuse=True) + msgs = self._recv(sock) + + if msgs is None: + continue + if not isinstance(msgs, list): + msgs = [msgs] + + for msg in msgs: + self.logger.debug( + 'Received message on %s:%d: %s', host, port, msg + ) + + evt = self._parse_event(host=host, msg=msg) + if evt: + get_bus().post(evt) + except Exception as e: + self.logger.warning( + 'Exception while getting the status of the Snapcast server %s:%d: %s', + host, + port, + e, + ) + + self._disconnect(host, port) + finally: + if self.poll_interval: + time.sleep(self.poll_interval) + + return _thread def _get_group(self, sock: socket.socket, group: str): - for g in self._status(sock).get('groups', []): + for g in self._get_status(sock).get('groups', []): if group == g.get('id') or group == g.get('name'): return g return None def _get_client(self, sock: socket.socket, client: str): - for g in self._status(sock).get('groups', []): + for g in self._get_status(sock).get('groups', []): clients = g.get('clients', []) for c in clients: @@ -94,15 +281,62 @@ class MusicSnapcastPlugin(Plugin): return None - def _status(self, sock: socket.socket): + def _get_status(self, sock: socket.socket) -> dict: request = { - 'id': self._get_req_id(), + 'id': self._next_req_id(), 'jsonrpc': '2.0', 'method': 'Server.GetStatus', } self._send(sock, request) - return (self._recv(sock) or {}).get('server', {}) + return (self._recv_result(sock) or {}).get('server', {}) + + def _status( + self, + host: Optional[str] = None, + port: Optional[int] = None, + client: Optional[str] = None, + group: Optional[str] = None, + ): + sock = None + + try: + sock = self._connect(host or self.host, port or self.port) + if client: + return self._get_client(sock, client) + if group: + return self._get_group(sock, group) + + return self._get_status(sock) + finally: + try: + if sock: + sock.close() + except Exception as e: + self.logger.warning('Error on socket close: %s', e) + + def _status_response( + self, + host: str, + port: int, + client: Optional[str] = None, + group: Optional[str] = None, + ): + sock = None + try: + sock = self._connect(host, port) + if client: + return self._get_client(sock, client) + if group: + return self._get_group(sock, group) + + return {'host': host, 'port': port, **self._get_status(sock)} + finally: + try: + if sock: + sock.close() + except Exception as e: + self.logger.warning('Error on socket close: %s', e) @action def status( @@ -113,7 +347,10 @@ class MusicSnapcastPlugin(Plugin): group: Optional[str] = None, ): """ - Get the status either of a Snapcast server, client or group + Get the current status of a Snapcast server, client or group. + + If not host, client or group is specified, the action will return the + status of all the Snapcast servers. :param host: Snapcast server to query (default: default configured host) :param port: Snapcast server port (default: default configured port) @@ -125,6 +362,8 @@ class MusicSnapcastPlugin(Plugin): .. code-block:: json "output": { + "host": "localhost", + "port": 1705, "groups": [ { "clients": [ @@ -205,22 +444,20 @@ class MusicSnapcastPlugin(Plugin): """ - sock = None + if client or group or host: + return self._status_response( + host=host or self.host, + port=port or self.port, + client=client, + group=group, + ) - try: - sock = self._connect(host or self.host, port or self.port) - if client: - return self._get_client(sock, client) - if group: - return self._get_group(sock, group) - - return self._status(sock) - finally: - try: - if sock: - sock.close() - except Exception as e: - self.logger.warning('Error on socket close: %s', e) + # Run status in parallel on all the hosts and return a list with all the + # results + with ThreadPoolExecutor(max_workers=len(self._hosts)) as executor: + return list( + executor.map(lambda h: self._status_response(h[0], h[1]), self._hosts) + ) @action def mute( @@ -242,15 +479,13 @@ class MusicSnapcastPlugin(Plugin): :param port: Snapcast server port (default: default configured port) """ - if not (client and group): - raise RuntimeError('Please specify either a client or a group') - + assert client or group, 'Please specify either a client or a group' sock = None try: sock = self._connect(host or self.host, port or self.port) request = { - 'id': self._get_req_id(), + 'id': self._next_req_id(), 'jsonrpc': '2.0', 'method': 'Group.SetMute' if group else 'Client.SetVolume', 'params': {}, @@ -276,7 +511,7 @@ class MusicSnapcastPlugin(Plugin): ) self._send(sock, request) - return self._recv(sock) + return self._recv_result(sock) finally: try: if sock: @@ -305,17 +540,17 @@ class MusicSnapcastPlugin(Plugin): :param port: Snapcast server port (default: default configured port) """ - if volume is None and delta is None and mute is None: - raise RuntimeError( - 'Please specify either an absolute volume or ' + 'relative delta' - ) + assert not (volume is None and delta is None and mute is None), ( + 'Please specify either an absolute volume, a relative delta or ' + + 'a mute status' + ) sock = None try: sock = self._connect(host or self.host, port or self.port) request = { - 'id': self._get_req_id(), + 'id': self._next_req_id(), 'jsonrpc': '2.0', 'method': 'Client.SetVolume', 'params': {}, @@ -340,7 +575,7 @@ class MusicSnapcastPlugin(Plugin): request['params']['volume']['percent'] = volume request['params']['volume']['muted'] = mute self._send(sock, request) - return self._recv(sock) + return self._recv_result(sock) finally: try: if sock: @@ -370,7 +605,7 @@ class MusicSnapcastPlugin(Plugin): try: sock = self._connect(host or self.host, port or self.port) request = { - 'id': self._get_req_id(), + 'id': self._next_req_id(), 'jsonrpc': '2.0', 'method': 'Client.SetName', 'params': {}, @@ -381,7 +616,7 @@ class MusicSnapcastPlugin(Plugin): request['params']['id'] = c['id'] request['params']['name'] = name self._send(sock, request) - return self._recv(sock) + return self._recv_result(sock) finally: try: if sock: @@ -411,7 +646,7 @@ class MusicSnapcastPlugin(Plugin): try: sock = self._connect(host or self.host, port or self.port) request = { - 'id': self._get_req_id(), + 'id': self._next_req_id(), 'jsonrpc': '2.0', 'method': 'Group.SetName', 'params': { @@ -421,7 +656,7 @@ class MusicSnapcastPlugin(Plugin): } self._send(sock, request) - return self._recv(sock) + return self._recv_result(sock) finally: try: if sock: @@ -451,7 +686,7 @@ class MusicSnapcastPlugin(Plugin): try: sock = self._connect(host or self.host, port or self.port) request = { - 'id': self._get_req_id(), + 'id': self._next_req_id(), 'jsonrpc': '2.0', 'method': 'Client.SetLatency', 'params': {'latency': latency}, @@ -461,7 +696,7 @@ class MusicSnapcastPlugin(Plugin): assert c, f'No such client: {client}' request['params']['id'] = c['id'] self._send(sock, request) - return self._recv(sock) + return self._recv_result(sock) finally: try: if sock: @@ -486,7 +721,7 @@ class MusicSnapcastPlugin(Plugin): try: sock = self._connect(host or self.host, port or self.port) request = { - 'id': self._get_req_id(), + 'id': self._next_req_id(), 'jsonrpc': '2.0', 'method': 'Server.DeleteClient', 'params': {}, @@ -496,7 +731,7 @@ class MusicSnapcastPlugin(Plugin): assert c, f'No such client: {client}' request['params']['id'] = c['id'] self._send(sock, request) - return self._recv(sock) + return self._recv_result(sock) finally: try: if sock: @@ -528,7 +763,7 @@ class MusicSnapcastPlugin(Plugin): g = self._get_group(sock, group) assert g, f'No such group: {group}' request = { - 'id': self._get_req_id(), + 'id': self._next_req_id(), 'jsonrpc': '2.0', 'method': 'Group.SetClients', 'params': {'id': g['id'], 'clients': []}, @@ -540,7 +775,7 @@ class MusicSnapcastPlugin(Plugin): request['params']['clients'].append(c['id']) self._send(sock, request) - return self._recv(sock) + return self._recv_result(sock) finally: try: if sock: @@ -572,7 +807,7 @@ class MusicSnapcastPlugin(Plugin): g = self._get_group(sock, group) assert g, f'No such group: {group}' request = { - 'id': self._get_req_id(), + 'id': self._next_req_id(), 'jsonrpc': '2.0', 'method': 'Group.SetStream', 'params': { @@ -582,7 +817,7 @@ class MusicSnapcastPlugin(Plugin): } self._send(sock, request) - return self._recv(sock) + return self._recv_result(sock) finally: try: if sock: @@ -590,25 +825,19 @@ class MusicSnapcastPlugin(Plugin): except Exception as e: self.logger.warning('Error on socket close: %s', e) - @action - def get_backend_hosts(self): - """ - :return: A dict with the Snapcast hosts configured on the backend - in the format ``host -> port``. - """ - - return { - host: self.backend_ports[i] for i, host in enumerate(self.backend_hosts) - } - @action def get_playing_streams(self, exclude_local: bool = False): """ - Returns the remote streams configured in the `music.snapcast` backend - that are currently active and unmuted. + Returns the configured remote streams that are currently active and + unmuted. + + .. warning:: This action is deprecated and mostly kept for + backward-compatibility purposes, as it doesn't allow the case where + multiple Snapcast instances can be running on the same host, nor it + provides additional information other that something is playing on a + certain host and port. Use :meth:`.status` instead. :param exclude_local: Exclude localhost connections (default: False) - :returns: dict with the host->port mapping. Example: .. code-block:: json @@ -623,20 +852,20 @@ class MusicSnapcastPlugin(Plugin): """ - backend_hosts: dict = self.get_backend_hosts().output # type: ignore playing_hosts = {} - def _worker(host, port): + def _worker(host: str, port: int): try: if exclude_local and ( host == 'localhost' or host == Config.get('device_id') ): return - server_status: dict = self.status(host=host, port=port).output # type: ignore - client_status: dict = self.status( # type: ignore - host=host, port=port, client=Config.get('device_id') - ).output + server_status = self._status(host=host, port=port) or {} + device_id: str = Config.get('device_id') # type: ignore + client_status = ( + self._status(host=host, port=port, client=device_id) or {} + ) if client_status.get('config', {}).get('volume', {}).get('muted'): return @@ -674,7 +903,7 @@ class MusicSnapcastPlugin(Plugin): workers = [] - for host, port in backend_hosts.items(): + for host, port in self._hosts: w = threading.Thread(target=_worker, args=(host, port)) w.start() workers.append(w) @@ -685,5 +914,36 @@ class MusicSnapcastPlugin(Plugin): return {'hosts': playing_hosts} + def main(self): + while not self.should_stop(): + for host, port in self._hosts: + thread_name = f'Snapcast-{host}-{port}' + + self._threads[host] = threading.Thread( + target=self._event_listener(host, port), name=thread_name + ) + + self._threads[host].start() + + for thread in self._threads.values(): + thread.join() + + self._threads = {} + + def stop(self): + for host, sock in self._socks.items(): + if sock: + try: + sock.close() + except Exception as e: + self.logger.warning( + 'Could not close Snapcast connection to %s: %s: %s', + host, + type(e), + e, + ) + + super().stop() + # vim:sw=4:ts=4:et: