diff --git a/platypush/plugins/media/mplayer/__init__.py b/platypush/plugins/media/mplayer/__init__.py
index 79c1fdfa3..5e990accb 100644
--- a/platypush/plugins/media/mplayer/__init__.py
+++ b/platypush/plugins/media/mplayer/__init__.py
@@ -4,6 +4,7 @@ import select
import subprocess
import threading
import time
+from typing import Any, Collection, Dict, List, Optional
from platypush.context import get_bus
from platypush.message.response import Response
@@ -39,10 +40,9 @@ class MediaMplayerPlugin(MediaPlugin):
def __init__(
self,
- mplayer_bin=None,
- mplayer_timeout=_mplayer_default_communicate_timeout,
- args=None,
- *argv,
+ mplayer_bin: Optional[str] = None,
+ mplayer_timeout: float = _mplayer_default_communicate_timeout,
+ args: Optional[Collection[str]] = None,
**kwargs,
):
"""
@@ -52,21 +52,13 @@ class MediaMplayerPlugin(MediaPlugin):
:param mplayer_bin: Path to the MPlayer executable (default: search for
the first occurrence in your system PATH environment variable)
- :type mplayer_bin: str
-
:param mplayer_timeout: Timeout in seconds to wait for more data
from MPlayer before considering a response ready (default: 0.5 seconds)
- :type mplayer_timeout: float
-
- :param subtitles: Path to the subtitles file
- :type subtitles: str
-
:param args: Default arguments that will be passed to the MPlayer
executable
- :type args: list
"""
- super().__init__(*argv, **kwargs)
+ super().__init__(**kwargs)
self.args = args or []
self._init_mplayer_bin(mplayer_bin=mplayer_bin)
@@ -106,17 +98,15 @@ class MediaMplayerPlugin(MediaPlugin):
try:
self._player.terminate()
except Exception as e:
- self.logger.debug(
- 'Failed to quit mplayer before _exec: {}'.format(str(e))
- )
+ self.logger.debug('Failed to quit mplayer before _exec: %s', e)
- mplayer_args = mplayer_args or []
+ m_args = mplayer_args or []
args = [self.mplayer_bin] + self._mplayer_bin_default_args
- for arg in self.args + mplayer_args:
+ for arg in (*self.args, *m_args):
if arg not in args:
args.append(arg)
- popen_args = {
+ popen_args: Dict[str, Any] = {
'stdin': subprocess.PIPE,
'stdout': subprocess.PIPE,
}
@@ -140,10 +130,13 @@ class MediaMplayerPlugin(MediaPlugin):
def args_pprint(txt):
lc = txt.lower()
if lc[0] == '[':
- return '%s=None' % lc[1:-1]
+ return f'{lc[1:-1]}=None'
return lc
while True:
+ if not mplayer.stdout:
+ break
+
line = mplayer.stdout.readline()
if not line:
break
@@ -153,7 +146,7 @@ class MediaMplayerPlugin(MediaPlugin):
args = line.split()
cmd_name = args.pop(0)
arguments = ', '.join([args_pprint(a) for a in args])
- self._actions[cmd_name] = '{}({})'.format(cmd_name, arguments)
+ self._actions[cmd_name] = f'{cmd_name}({arguments})'
def _exec(
self, cmd, *args, mplayer_args=None, prefix=None, wait_for_response=False
@@ -161,22 +154,27 @@ class MediaMplayerPlugin(MediaPlugin):
cmd_name = cmd
response = None
- if cmd_name == 'loadfile' or cmd_name == 'loadlist':
+ if cmd_name in {'loadfile', 'loadlist'}:
self._init_mplayer(mplayer_args)
else:
if not self._player:
self.logger.warning('MPlayer is not running')
- cmd = '{}{}{}{}\n'.format(
- prefix + ' ' if prefix else '',
- cmd_name,
- ' ' if args else '',
- ' '.join(repr(a) for a in args),
+ cmd = (
+ f'{prefix + " " if prefix else ""}'
+ + cmd_name
+ + (" " if args else "")
+ + " ".join(repr(a) for a in args)
+ + '\n'
).encode()
if not self._player:
+ self.logger.warning('Cannot send command %s: player unavailable', cmd)
+ return
+
+ if not self._player.stdin:
self.logger.warning(
- 'Cannot send command {}: player unavailable'.format(cmd)
+ 'Could not communicate with the mplayer process: the stdin is closed'
)
return
@@ -199,6 +197,12 @@ class MediaMplayerPlugin(MediaPlugin):
if not wait_for_response:
return
+ if not (self._player and self._player.stdout):
+ self.logger.warning(
+ 'Could not communicate with the mplayer process: the stdout is closed'
+ )
+ return
+
poll = select.poll()
poll.register(self._player.stdout, select.POLLIN)
last_read_time = time.time()
@@ -209,11 +213,16 @@ class MediaMplayerPlugin(MediaPlugin):
if not self._player:
break
- line = self._player.stdout.readline().decode()
+ buf = self._player.stdout.readline()
+ line = buf.decode() if isinstance(buf, bytes) else buf
last_read_time = time.time()
if line.startswith('ANS_'):
m = re.match('^([^=]+)=(.*)$', line[4:])
+ if not m:
+ self.logger.warning('Unexpected response: %s', line)
+ break
+
k, v = m.group(1), m.group(2)
v = v.strip()
if v == 'yes':
@@ -222,7 +231,8 @@ class MediaMplayerPlugin(MediaPlugin):
v = False
try:
- v = eval(v)
+ if isinstance(v, str):
+ v = eval(v) # pylint: disable=eval-used
except Exception:
pass
@@ -272,25 +282,26 @@ class MediaMplayerPlugin(MediaPlugin):
bus.post(evt_type(player='local', plugin='media.mplayer', **evt))
@action
- def play(self, resource, subtitles=None, mplayer_args=None):
+ def play(
+ self,
+ resource: str,
+ subtitles: Optional[str] = None,
+ mplayer_args: Optional[List[str]] = None,
+ ):
"""
Play a resource.
:param resource: Resource to play - can be a local file or a remote URL
- :type resource: str
-
:param subtitles: Path to optional subtitle file
- :type subtitles: str
-
:param mplayer_args: Extra runtime arguments that will be passed to the
MPlayer executable
- :type mplayer_args: list[str]
"""
self._post_event(MediaPlayRequestEvent, resource=resource)
if subtitles:
- mplayer_args = mplayer_args or []
- mplayer_args += ['-sub', self.get_subtitles_file(subtitles)]
+ subs = self.get_subtitles_file(subtitles)
+ if subs:
+ mplayer_args = list(mplayer_args or []) + ['-sub', subs]
resource = self._get_resource(resource)
if resource.startswith('file://'):
@@ -305,67 +316,78 @@ class MediaMplayerPlugin(MediaPlugin):
return self.status()
@action
- def pause(self):
+ def pause(self, *_, **__):
"""Toggle the paused state"""
self._exec('pause')
self._post_event(MediaPauseEvent)
return self.status()
@action
- def stop(self):
+ def stop(self, *_, **__):
"""Stop the playback"""
# return self._exec('stop')
self.quit()
return self.status()
@action
- def quit(self):
+ def quit(self, *_, **__):
"""Quit the player"""
self._exec('quit')
self._post_event(MediaStopEvent)
return self.status()
@action
- def voldown(self, step=10.0):
+ def voldown(self, *_, step=10.0, **__):
"""Volume down by (default: 10)%"""
self._exec('volume', -step * 10)
return self.status()
@action
- def volup(self, step=10.0):
+ def volup(self, *_, step=10.0, **__):
"""Volume up by (default: 10)%"""
self._exec('volume', step * 10)
return self.status()
@action
- def back(self, offset=30.0):
+ def back(self, *_, offset=30.0, **__):
"""Back by (default: 30) seconds"""
self.step_property('time_pos', -offset)
return self.status()
@action
- def forward(self, offset=30.0):
+ def forward(self, *_, offset=30.0, **__):
"""Forward by (default: 30) seconds"""
self.step_property('time_pos', offset)
return self.status()
@action
- def toggle_subtitles(self):
+ def toggle_subtitles(self, *_, **__):
"""Toggle the subtitles visibility"""
- subs = self.get_property('sub_visibility').output.get('sub_visibility')
+ response: dict = (
+ self.get_property('sub_visibility').output or {} # type: ignore
+ )
+ subs = response.get('sub_visibility')
self._exec('sub_visibility', int(not subs))
return self.status()
@action
- def add_subtitles(self, filename, **__):
- """Sets media subtitles from filename"""
+ def add_subtitles(self, filename: str, **__):
+ """
+ Sets media subtitles from filename
+
+ :param filename: Subtitles file.
+ """
self._exec('sub_visibility', 1)
self._exec('sub_load', filename)
return self.status()
@action
- def remove_subtitles(self, index=None):
- """Removes the subtitle specified by the index (default: all)"""
+ def remove_subtitles(self, *_, index: Optional[int] = None, **__):
+ """
+ Removes the subtitle specified by the index (default: all)
+
+ :param index: (1-based) index of the subtitles track to remove.
+ """
if index is None:
self._exec('sub_remove')
else:
@@ -374,14 +396,15 @@ class MediaMplayerPlugin(MediaPlugin):
return self.status()
@action
- def is_playing(self):
+ def is_playing(self, *_, **__):
"""
:returns: True if it's playing, False otherwise
"""
- return self.get_property('pause').output.get('pause') is False
+ response: dict = self.get_property('pause').output or {} # type: ignore
+ return response.get('pause') is False
@action
- def load(self, resource, mplayer_args=None, **kwargs):
+ def load(self, resource, *_, mplayer_args: Optional[Collection[str]] = None, **__):
"""
Load a resource/video in the player.
"""
@@ -390,13 +413,13 @@ class MediaMplayerPlugin(MediaPlugin):
return self.play(resource, mplayer_args=mplayer_args)
@action
- def mute(self):
+ def mute(self, *_, **__):
"""Toggle mute state"""
self._exec('mute')
return self.status()
@action
- def seek(self, position):
+ def seek(self, position: float, *_, **__):
"""
Seek backward/forward by the specified number of seconds
@@ -407,7 +430,7 @@ class MediaMplayerPlugin(MediaPlugin):
return self.status()
@action
- def set_position(self, position):
+ def set_position(self, position: float, *_, **__):
"""
Seek backward/forward to the specified absolute position
@@ -418,7 +441,7 @@ class MediaMplayerPlugin(MediaPlugin):
return self.status()
@action
- def set_volume(self, volume):
+ def set_volume(self, volume: float, *_, **__):
"""
Set the volume
@@ -463,7 +486,7 @@ class MediaMplayerPlugin(MediaPlugin):
with self._status_lock:
for prop, player_prop in props.items():
value = self.get_property(player_prop).output
- if value is not None:
+ if isinstance(value, dict):
status[prop] = value.get(player_prop)
status['seekable'] = bool(status['duration'])
@@ -480,7 +503,11 @@ class MediaMplayerPlugin(MediaPlugin):
return status
@action
- def get_property(self, property, args=None):
+ def get_property(
+ self,
+ property: str, # pylint: disable=redefined-builtin
+ args: Optional[Collection[str]] = None,
+ ):
"""
Get a player property (e.g. pause, fullscreen etc.). See
https://www.mplayerhq.hu/DOCS/tech/slave.txt for a full list of the
@@ -503,14 +530,23 @@ class MediaMplayerPlugin(MediaPlugin):
for k, v in result.items():
if k == 'ERROR' and v not in response.errors:
- response.errors.append('{}{}: {}'.format(property, args, v))
+ if not isinstance(response.errors, list):
+ response.errors = []
+ response.errors.append(f'{property}{args}: {v}')
else:
+ if not isinstance(response.output, dict):
+ response.output = {}
response.output[k] = v
return response
@action
- def set_property(self, property, value, args=None):
+ def set_property(
+ self,
+ property: str, # pylint: disable=redefined-builtin
+ value: Any,
+ args: Optional[Collection[str]] = None,
+ ):
"""
Set a player property (e.g. pause, fullscreen etc.). See
https://www.mplayerhq.hu/DOCS/tech/slave.txt for a full list of the
@@ -534,14 +570,25 @@ class MediaMplayerPlugin(MediaPlugin):
for k, v in result.items():
if k == 'ERROR' and v not in response.errors:
- response.errors.append('{} {}{}: {}'.format(property, value, args, v))
+ if not isinstance(response.errors, list):
+ response.errors = []
+ response.errors.append(f'{property} {value}{args}: {v}')
else:
+ if not isinstance(response.output, dict):
+ response.output = {}
response.output[k] = v
return response
@action
- def step_property(self, property, value, args=None):
+ def step_property(
+ self,
+ property: str, # pylint: disable=redefined-builtin
+ value: Any,
+ *_,
+ args: Optional[Collection[str]] = None,
+ **__,
+ ):
"""
Step a player property (e.g. volume, time_pos etc.). See
https://www.mplayerhq.hu/DOCS/tech/slave.txt for a full list of the
@@ -565,13 +612,18 @@ class MediaMplayerPlugin(MediaPlugin):
for k, v in result.items():
if k == 'ERROR' and v not in response.errors:
- response.errors.append('{} {}{}: {}'.format(property, value, args, v))
+ if not isinstance(response.errors, list):
+ response.errors = []
+ response.errors.append(f'{property} {value}{args}: {v}')
else:
+ if not isinstance(response.output, dict):
+ response.output = {}
response.output[k] = v
return response
- def set_subtitles(self, filename, *args, **kwargs):
+ def set_subtitles(self, filename: str, *_, **__):
+ self.logger.debug('set_subtitles called with filename=%s', filename)
raise NotImplementedError
diff --git a/platypush/plugins/media/omxplayer/__init__.py b/platypush/plugins/media/omxplayer/__init__.py
index a33597c97..37b9ea857 100644
--- a/platypush/plugins/media/omxplayer/__init__.py
+++ b/platypush/plugins/media/omxplayer/__init__.py
@@ -1,5 +1,7 @@
import enum
import threading
+from typing import Collection, Optional
+
import urllib.parse
from platypush.context import get_bus
@@ -16,6 +18,10 @@ from platypush.plugins import action
class PlayerEvent(enum.Enum):
+ """
+ Supported player events.
+ """
+
STOP = 'stop'
PLAY = 'play'
PAUSE = 'pause'
@@ -26,17 +32,18 @@ class MediaOmxplayerPlugin(MediaPlugin):
Plugin to control video and media playback using OMXPlayer.
"""
- def __init__(self, args=None, *argv, timeout: float = 20.0, **kwargs):
+ def __init__(
+ self, args: Optional[Collection[str]] = None, timeout: float = 20.0, **kwargs
+ ):
"""
:param args: Arguments that will be passed to the OMXPlayer constructor
(e.g. subtitles, volume, start position, window size etc.) see
https://github.com/popcornmix/omxplayer#synopsis and
https://python-omxplayer-wrapper.readthedocs.io/en/latest/omxplayer/#omxplayer.player.OMXPlayer
- :type args: list
:param timeout: How long the plugin should wait for a video to start upon play request (default: 20 seconds).
"""
- super().__init__(*argv, **kwargs)
+ super().__init__(**kwargs)
if args is None:
args = []
@@ -48,7 +55,7 @@ class MediaOmxplayerPlugin(MediaPlugin):
self._play_started = threading.Event()
@action
- def play(self, resource=None, subtitles=None, *args, **kwargs):
+ def play(self, *args, resource=None, subtitles=None, **_):
"""
Play or resume playing a resource.
@@ -68,9 +75,8 @@ class MediaOmxplayerPlugin(MediaPlugin):
self._player.play()
return self.status()
- else:
- self._play_started.clear()
+ self._play_started.clear()
self._post_event(MediaPlayRequestEvent, resource=resource)
if subtitles:
@@ -141,7 +147,7 @@ class MediaOmxplayerPlugin(MediaPlugin):
return {'status': 'stop'}
- def get_volume(self) -> float:
+ def get_volume(self) -> Optional[float]:
"""
:return: The player volume in percentage [0, 100].
"""
@@ -157,7 +163,9 @@ class MediaOmxplayerPlugin(MediaPlugin):
:type step: float
"""
if self._player:
- self.set_volume(max(0, self.get_volume() - step))
+ vol = self.get_volume()
+ if vol is not None:
+ self.set_volume(max(0, vol - step))
return self.status()
@action
@@ -169,7 +177,9 @@ class MediaOmxplayerPlugin(MediaPlugin):
:type step: float
"""
if self._player:
- self.set_volume(min(100, self.get_volume() + step))
+ vol = self.get_volume()
+ if vol is not None:
+ self.set_volume(min(100, vol + step))
return self.status()
@action
@@ -213,23 +223,19 @@ class MediaOmxplayerPlugin(MediaPlugin):
return self.status()
@action
- def is_playing(self):
+ def is_playing(self, *_, **__) -> bool:
"""
:returns: True if it's playing, False otherwise
"""
-
- return self._player.is_playing()
+ return self._player.is_playing() if self._player else False
@action
- def load(self, resource, pause=False, **kwargs):
+ def load(self, resource: str, *_, pause: bool = False, **__):
"""
Load a resource/video in the player.
:param resource: URL or filename to load
- :type resource: str
-
:param pause: If set, load the video in paused mode (default: False)
- :type pause: bool
"""
if self._player:
@@ -244,48 +250,45 @@ class MediaOmxplayerPlugin(MediaPlugin):
return self.status()
@action
- def mute(self):
+ def mute(self, *_, **__):
"""Mute the player"""
if self._player:
self._player.mute()
return self.status()
@action
- def unmute(self):
+ def unmute(self, *_, **__):
"""Unmute the player"""
if self._player:
self._player.unmute()
return self.status()
@action
- def seek(self, position):
+ def seek(self, position: float, **__):
"""
Seek to the specified number of seconds from the start.
:param position: Number of seconds from the start
- :type position: float
"""
if self._player:
self._player.set_position(position)
return self.status()
@action
- def set_position(self, position):
+ def set_position(self, position: float, **__):
"""
Seek to the specified number of seconds from the start (same as :meth:`.seek`).
:param position: Number of seconds from the start
- :type position: float
"""
return self.seek(position)
@action
- def set_volume(self, volume):
+ def set_volume(self, volume: float, *_, **__):
"""
Set the volume
:param volume: Volume value between 0 and 100
- :type volume: float
"""
if self._player:
@@ -327,7 +330,7 @@ class MediaOmxplayerPlugin(MediaPlugin):
try:
state = self._player.playback_status().lower()
except (OMXPlayerDeadError, DBusException) as e:
- self.logger.warning(f'Could not retrieve player status: {e}')
+ self.logger.warning('Could not retrieve player status: %s', e)
if isinstance(e, OMXPlayerDeadError):
self._player = None
@@ -362,9 +365,7 @@ class MediaOmxplayerPlugin(MediaPlugin):
def add_handler(self, event_type, callback):
if event_type not in self._handlers.keys():
- raise AttributeError(
- '{} is not a valid PlayerEvent type'.format(event_type)
- )
+ raise AttributeError(f'{event_type} is not a valid PlayerEvent type')
self._handlers[event_type].append(callback)
@@ -420,13 +421,13 @@ class MediaOmxplayerPlugin(MediaPlugin):
self._player.positionEvent += self.on_seek()
self._player.seekEvent += self.on_seek()
- def toggle_subtitles(self, *args, **kwargs):
+ def toggle_subtitles(self, *_, **__):
raise NotImplementedError
- def set_subtitles(self, filename, *args, **kwargs):
+ def set_subtitles(self, *_, **__):
raise NotImplementedError
- def remove_subtitles(self, *args, **kwargs):
+ def remove_subtitles(self, *_, **__):
raise NotImplementedError
diff --git a/platypush/plugins/media/vlc/__init__.py b/platypush/plugins/media/vlc/__init__.py
index 05f3d2a8c..e024523e6 100644
--- a/platypush/plugins/media/vlc/__init__.py
+++ b/platypush/plugins/media/vlc/__init__.py
@@ -1,7 +1,7 @@
import os
import threading
import urllib.parse
-from typing import Optional
+from typing import Collection, Optional
from platypush.context import get_bus
from platypush.plugins.media import PlayerState, MediaPlugin
@@ -24,23 +24,22 @@ class MediaVlcPlugin(MediaPlugin):
Plugin to control VLC instances.
"""
- def __init__(self, args=None, fullscreen=False, volume=100, *argv, **kwargs):
+ def __init__(
+ self,
+ args: Optional[Collection[str]] = None,
+ fullscreen: bool = False,
+ volume: int = 100,
+ **kwargs
+ ):
"""
- Create the vlc wrapper.
-
:param args: List of extra arguments to pass to the VLC executable (e.g.
``['--sub-language=en', '--snapshot-path=/mnt/snapshots']``)
- :type args: list[str]
-
:param fullscreen: Set to True if you want media files to be opened in
fullscreen by default (can be overridden by `.play()`) (default: False)
- :type fullscreen: bool
-
:param volume: Default media volume (default: 100)
- :type volume: int
"""
- super().__init__(*argv, **kwargs)
+ super().__init__(**kwargs)
self._args = args or []
self._instance = None
@@ -98,6 +97,7 @@ class MediaVlcPlugin(MediaPlugin):
self._monitor_thread = threading.Thread(target=self._player_monitor)
self._monitor_thread.start()
self._instance = vlc.Instance(*self._args)
+ assert self._instance, 'Could not create a VLC instance'
self._player = self._instance.media_player_new(resource)
for evt in self._watched_event_types():
@@ -136,65 +136,67 @@ class MediaVlcPlugin(MediaPlugin):
def callback(event):
from vlc import EventType
- self.logger.debug('Received vlc event: {}'.format(event))
-
- if event.type == EventType.MediaPlayerPlaying:
+ self.logger.debug('Received vlc event: %s', event)
+ if event.type == EventType.MediaPlayerPlaying: # type: ignore
self._post_event(MediaPlayEvent, resource=self._get_current_resource())
- elif event.type == EventType.MediaPlayerPaused:
+ elif event.type == EventType.MediaPlayerPaused: # type: ignore
self._post_event(MediaPauseEvent)
elif (
- event.type == EventType.MediaPlayerStopped
- or event.type == EventType.MediaPlayerEndReached
+ event.type == EventType.MediaPlayerStopped # type: ignore
+ or event.type == EventType.MediaPlayerEndReached # type: ignore
):
self._on_stop_event.set()
self._post_event(MediaStopEvent)
for cbk in self._on_stop_callbacks:
cbk()
- elif (
- event.type == EventType.MediaPlayerTitleChanged
- or event.type == EventType.MediaPlayerMediaChanged
+ elif self._player and (
+ event.type
+ in (
+ EventType.MediaPlayerTitleChanged, # type: ignore
+ EventType.MediaPlayerMediaChanged, # type: ignore
+ )
):
self._title = self._player.get_title() or self._filename
- if event.type == EventType.MediaPlayerMediaChanged:
+ if event.type == EventType.MediaPlayerMediaChanged: # type: ignore
self._post_event(NewPlayingMediaEvent, resource=self._title)
- elif event.type == EventType.MediaPlayerLengthChanged:
+ elif event.type == EventType.MediaPlayerLengthChanged: # type: ignore
self._post_event(
NewPlayingMediaEvent, resource=self._get_current_resource()
)
- elif event.type == EventType.MediaPlayerTimeChanged:
+ elif self._player and event.type == EventType.MediaPlayerTimeChanged: # type: ignore
pos = float(self._player.get_time() / 1000)
if self._latest_seek is None or abs(pos - self._latest_seek) > 5:
self._post_event(MediaSeekEvent, position=pos)
self._latest_seek = pos
- elif event.type == EventType.MediaPlayerAudioVolume:
+ elif self._player and event.type == EventType.MediaPlayerAudioVolume: # type: ignore
self._post_event(
MediaVolumeChangedEvent, volume=self._player.audio_get_volume()
)
- elif event.type == EventType.MediaPlayerMuted:
+ elif event.type == EventType.MediaPlayerMuted: # type: ignore
self._post_event(MediaMuteChangedEvent, mute=True)
- elif event.type == EventType.MediaPlayerUnmuted:
+ elif event.type == EventType.MediaPlayerUnmuted: # type: ignore
self._post_event(MediaMuteChangedEvent, mute=False)
return callback
@action
- def play(self, resource=None, subtitles=None, fullscreen=None, volume=None):
+ def play(
+ self,
+ resource: Optional[str] = None,
+ subtitles: Optional[str] = None,
+ fullscreen: Optional[bool] = None,
+ volume: Optional[int] = None,
+ ):
"""
Play a resource.
- :param resource: Resource to play - can be a local file or a remote URL (default: None == toggle play).
- :type resource: str
-
+ :param resource: Resource to play - can be a local file or a remote URL
+ (default: None == toggle play).
:param subtitles: Path to optional subtitle file
- :type subtitles: str
-
:param fullscreen: Set to explicitly enable/disable fullscreen (default:
`fullscreen` configured value or False)
- :type fullscreen: bool
-
:param volume: Set to explicitly set the playback volume (default:
`volume` configured value or 100)
- :type fullscreen: bool
"""
if not resource:
@@ -208,12 +210,14 @@ class MediaVlcPlugin(MediaPlugin):
self._filename = resource
self._init_vlc(resource)
- if subtitles:
+ if subtitles and self._player:
if subtitles.startswith('file://'):
subtitles = subtitles[len('file://') :]
self._player.video_set_subtitle_file(subtitles)
- self._player.play()
+ if self._player:
+ self._player.play()
+
if self.volume:
self.set_volume(volume=self.volume)
@@ -226,71 +230,60 @@ class MediaVlcPlugin(MediaPlugin):
return self.status()
@action
- def pause(self):
+ def pause(self, *_, **__):
"""Toggle the paused state"""
- if not self._player:
- return None, 'No vlc instance is running'
- if not self._player.can_pause():
- return None, 'The specified media type cannot be paused'
-
+ assert self._player, 'No vlc instance is running'
+ assert self._player.can_pause(), 'The specified media type cannot be paused'
self._player.pause()
return self.status()
@action
- def quit(self):
+ def quit(self, *_, **__):
"""Quit the player (same as `stop`)"""
with self._stop_lock:
- if not self._player:
- return None, 'No vlc instance is running'
-
+ assert self._player, 'No vlc instance is running'
self._player.stop()
self._on_stop_event.wait(timeout=5)
self._reset_state()
return self.status()
@action
- def stop(self):
+ def stop(self, *_, **__):
"""Stop the application (same as `quit`)"""
return self.quit()
@action
- def voldown(self, step=10.0):
+ def voldown(self, *_, step: float = 10.0, **__):
"""Volume down by (default: 10)%"""
- if not self._player:
- return None, 'No vlc instance is running'
+ assert self._player, 'No vlc instance is running'
return self.set_volume(int(max(0, self._player.audio_get_volume() - step)))
@action
- def volup(self, step=10.0):
+ def volup(self, *_, step: float = 10.0, **__):
"""Volume up by (default: 10)%"""
- if not self._player:
- return None, 'No vlc instance is running'
+ assert self._player, 'No vlc instance is running'
return self.set_volume(int(min(100, self._player.audio_get_volume() + step)))
@action
- def set_volume(self, volume):
+ def set_volume(self, volume: int):
"""
Set the volume
:param volume: Volume value between 0 and 100
- :type volume: float
"""
- if not self._player:
- return None, 'No vlc instance is running'
-
+ assert self._player, 'No vlc instance is running'
volume = max(0, min([100, volume]))
self._player.audio_set_volume(volume)
- status = self.status().output
+ status: dict = self.status().output # type: ignore
status['volume'] = volume
return status
@action
- def seek(self, position):
+ def seek(self, position: float):
"""
Seek backward/forward by the specified number of seconds
:param position: Number of seconds relative to the current cursor
- :type position: int
"""
if not self._player:
return None, 'No vlc instance is running'
@@ -306,7 +299,7 @@ class MediaVlcPlugin(MediaPlugin):
return self.status()
@action
- def back(self, offset=30.0):
+ def back(self, *_, offset: float = 30.0, **__):
"""Back by (default: 30) seconds"""
if not self._player:
return None, 'No vlc instance is running'
@@ -319,7 +312,7 @@ class MediaVlcPlugin(MediaPlugin):
return self.seek(pos)
@action
- def forward(self, offset=30.0):
+ def forward(self, *_, offset: float = 30.0, **__):
"""Forward by (default: 30) seconds"""
if not self._player:
return None, 'No vlc instance is running'
@@ -334,13 +327,12 @@ class MediaVlcPlugin(MediaPlugin):
return self.seek(pos)
@action
- def toggle_subtitles(self, visibile=None):
+ def toggle_subtitles(self, *_, **__):
"""Toggle the subtitles visibility"""
- if not self._player:
- return None, 'No vlc instance is running'
-
- if self._player.video_get_spu_count() == 0:
- return None, 'The media file has no subtitles set'
+ assert self._player, 'No vlc instance is running'
+ assert (
+ self._player.video_get_spu_count() > 0
+ ), 'The media file has no subtitles set'
if self._player.video_get_spu() is None or self._player.video_get_spu() == -1:
self._player.video_set_spu(0)
@@ -350,36 +342,32 @@ class MediaVlcPlugin(MediaPlugin):
@action
def toggle_fullscreen(self):
"""Toggle the fullscreen mode"""
- if not self._player:
- return None, 'No vlc instance is running'
+ assert self._player, 'No vlc instance is running'
self._player.toggle_fullscreen()
@action
- def set_fullscreen(self, fullscreen=True):
+ def set_fullscreen(self, fullscreen: bool = True):
"""Set fullscreen mode"""
- if not self._player:
- return None, 'No vlc instance is running'
+ assert self._player, 'No vlc instance is running'
self._player.set_fullscreen(fullscreen)
@action
- def set_subtitles(self, filename, **args):
+ def set_subtitles(self, filename: str, *_, **__):
"""Sets media subtitles from filename"""
- if not self._player:
- return None, 'No vlc instance is running'
+ assert self._player, 'No vlc instance is running'
if filename.startswith('file://'):
filename = filename[len('file://') :]
self._player.video_set_subtitle_file(filename)
@action
- def remove_subtitles(self):
+ def remove_subtitles(self, *_, **__):
"""Removes (hides) the subtitles"""
- if not self._player:
- return None, 'No vlc instance is running'
+ assert self._player, 'No vlc instance is running'
self._player.video_set_spu(-1)
@action
- def is_playing(self):
+ def is_playing(self, *_, **__):
"""
:returns: True if it's playing, False otherwise
"""
@@ -388,7 +376,7 @@ class MediaVlcPlugin(MediaPlugin):
return self._player.is_playing()
@action
- def load(self, resource, **args):
+ def load(self, resource, *_, **args):
"""
Load/queue a resource/video to the player
"""
@@ -398,14 +386,13 @@ class MediaVlcPlugin(MediaPlugin):
return self.status()
@action
- def mute(self):
+ def mute(self, *_, **__):
"""Toggle mute state"""
- if not self._player:
- return None, 'No vlc instance is running'
+ assert self._player, 'No vlc instance is running'
self._player.audio_toggle_mute()
@action
- def set_position(self, position):
+ def set_position(self, position: float, **_):
"""
Seek backward/forward to the specified absolute position (same as ``seek``)
"""
@@ -434,9 +421,9 @@ class MediaVlcPlugin(MediaPlugin):
status = {}
vlc_state = self._player.get_state()
- if vlc_state == vlc.State.Playing:
+ if vlc_state == vlc.State.Playing: # type: ignore
status['state'] = PlayerState.PLAY.value
- elif vlc_state == vlc.State.Paused:
+ elif vlc_state == vlc.State.Paused: # type: ignore
status['state'] = PlayerState.PAUSE.value
else:
status['state'] = PlayerState.STOP.value
@@ -446,6 +433,7 @@ class MediaVlcPlugin(MediaPlugin):
if self._player.get_media()
else None
)
+
status['position'] = (
float(self._player.get_time() / 1000)
if self._player.get_time() is not None
@@ -477,7 +465,7 @@ class MediaVlcPlugin(MediaPlugin):
def _get_current_resource(self):
if not self._player or not self._player.get_media():
- return
+ return None
return self._player.get_media().get_mrl()
diff --git a/platypush/plugins/music/mpd/__init__.py b/platypush/plugins/music/mpd/__init__.py
index d7acfaf09..b82ff7dc9 100644
--- a/platypush/plugins/music/mpd/__init__.py
+++ b/platypush/plugins/music/mpd/__init__.py
@@ -1,7 +1,7 @@
import re
import threading
import time
-from typing import Optional, Union
+from typing import Collection, Optional, Union
from platypush.plugins import action
from platypush.plugins.music import MusicPlugin
@@ -9,27 +9,29 @@ from platypush.plugins.music import MusicPlugin
class MusicMpdPlugin(MusicPlugin):
"""
- This plugin allows you to interact with an MPD/Mopidy music server. MPD
- (https://www.musicpd.org/) is a flexible server-side protocol/application
- for handling music collections and playing music, mostly aimed to manage
- local libraries. Mopidy (https://www.mopidy.com/) is an evolution of MPD,
- compatible with the original protocol and with support for multiple music
- sources through plugins (e.g. Spotify, TuneIn, Soundcloud, local files
- etc.).
+ This plugin allows you to interact with an MPD/Mopidy music server.
+
+ `MPD `_ is a flexible server-side
+ protocol/application for handling music collections and playing music,
+ mostly aimed to manage local libraries.
+
+ `Mopidy `_ is an evolution of MPD, compatible with
+ the original protocol and with support for multiple music sources through
+ plugins (e.g. Spotify, TuneIn, Soundcloud, local files etc.).
+
+ .. note:: As of Mopidy 3.0 MPD is an optional interface provided by the
+ ``mopidy-mpd`` extension. Make sure that you have the extension
+ installed and enabled on your instance to use this plugin with your
+ server.
- **NOTE**: As of Mopidy 3.0 MPD is an optional interface provided by the ``mopidy-mpd`` extension. Make sure that you
- have the extension installed and enabled on your instance to use this plugin with your server.
"""
_client_lock = threading.RLock()
- def __init__(self, host, port=6600):
+ def __init__(self, host: str, port: int = 6600):
"""
:param host: MPD IP/hostname
- :type host: str
-
:param port: MPD port (default: 6600)
- :type port: int
"""
super().__init__()
@@ -37,12 +39,12 @@ class MusicMpdPlugin(MusicPlugin):
self.port = port
self.client = None
- def _connect(self, n_tries=2):
+ def _connect(self, n_tries: int = 2):
import mpd
with self._client_lock:
if self.client:
- return
+ return self.client
error = None
while n_tries > 0:
@@ -54,9 +56,9 @@ class MusicMpdPlugin(MusicPlugin):
except Exception as e:
error = e
self.logger.warning(
- 'Connection exception: {}{}'.format(
- str(e), (': Retrying' if n_tries > 0 else '')
- )
+ 'Connection exception: %s%s',
+ e,
+ (': Retrying' if n_tries > 0 else ''),
)
time.sleep(0.5)
@@ -64,7 +66,9 @@ class MusicMpdPlugin(MusicPlugin):
if error:
raise error
- def _exec(self, method, *args, **kwargs):
+ return self.client
+
+ def _exec(self, method: str, *args, **kwargs):
error = None
n_tries = int(kwargs.pop('n_tries')) if 'n_tries' in kwargs else 2
return_status = (
@@ -84,16 +88,16 @@ class MusicMpdPlugin(MusicPlugin):
except Exception as e:
error = str(e)
self.logger.warning(
- 'Exception while executing MPD method {}: {}'.format(method, error)
+ 'Exception while executing MPD method %s: %s', method, error
)
self.client = None
return None, error
@action
- def play(self, resource=None):
+ def play(self, resource: Optional[str] = None, **__):
"""
- Play a resource by path/URI
+ Play a resource by path/URI.
:param resource: Resource path/URI
:type resource: str
@@ -106,213 +110,184 @@ class MusicMpdPlugin(MusicPlugin):
return self._exec('play')
@action
- def play_pos(self, pos):
+ def play_pos(self, pos: int):
"""
- Play a track in the current playlist by position number
+ Play a track in the current playlist by position number.
- :param pos: Position number
+ :param pos: Position number.
"""
return self._exec('play', pos)
@action
- def pause(self):
+ def pause(self, *_, **__):
"""Pause playback"""
- status = self.status().output['state']
- if status == 'play':
- return self._exec('pause')
- else:
- return self._exec('play')
+ status = self._status()['state']
+ return self._exec('pause') if status == 'play' else self._exec('play')
@action
def pause_if_playing(self):
"""Pause playback only if it's playing"""
-
- status = self.status().output['state']
- if status == 'play':
- return self._exec('pause')
+ status = self._status()['state']
+ return self._exec('pause') if status == 'play' else None
@action
def play_if_paused(self):
"""Play only if it's paused (resume)"""
-
- status = self.status().output['state']
- if status == 'pause':
- return self._exec('play')
+ status = self._status()['state']
+ return self._exec('play') if status == 'pause' else None
@action
def play_if_paused_or_stopped(self):
"""Play only if it's paused or stopped"""
-
- status = self.status().output['state']
- if status == 'pause' or status == 'stop':
- return self._exec('play')
+ status = self._status()['state']
+ return self._exec('play') if status in ('pause', 'stop') else None
@action
- def stop(self):
+ def stop(self, *_, **__):
"""Stop playback"""
return self._exec('stop')
@action
def play_or_stop(self):
"""Play or stop (play state toggle)"""
- status = self.status().output['state']
+ status = self._status()['state']
if status == 'play':
return self._exec('stop')
- else:
- return self._exec('play')
+ return self._exec('play')
@action
- def playid(self, track_id):
+ def playid(self, track_id: str):
"""
- Play a track by ID
+ Play a track by ID.
- :param track_id: Track ID
- :type track_id: str
+ :param track_id: Track ID.
"""
-
return self._exec('playid', track_id)
@action
- def next(self):
+ def next(self, *_, **__):
"""Play the next track"""
return self._exec('next')
@action
- def previous(self):
+ def previous(self, *_, **__):
"""Play the previous track"""
return self._exec('previous')
@action
- def setvol(self, vol):
+ def setvol(self, vol: int):
"""
- Set the volume (DEPRECATED, use :meth:`.set_volume` instead).
+ Set the volume.
- :param vol: Volume value (range: 0-100)
- :type vol: int
+ ..warning :: **DEPRECATED**, use :meth:`.set_volume` instead.
+
+ :param vol: Volume value (range: 0-100).
"""
return self.set_volume(vol)
@action
- def set_volume(self, volume):
+ def set_volume(self, volume: int, *_, **__):
"""
Set the volume.
- :param volume: Volume value (range: 0-100)
- :type volume: int
+ :param volume: Volume value (range: 0-100).
"""
return self._exec('setvol', str(volume))
@action
- def volup(self, delta=10):
+ def volup(self, *_, delta: int = 10, **__):
"""
- Turn up the volume
+ Turn up the volume.
- :param delta: Volume up delta (default: +10%)
- :type delta: int
+ :param delta: Volume up delta (default: +10%).
"""
-
- volume = int(self.status().output['volume'])
+ volume = int(self._status()['volume'])
new_volume = min(volume + delta, 100)
return self.setvol(new_volume)
@action
- def voldown(self, delta=10):
+ def voldown(self, *_, delta: int = 10, **__):
"""
- Turn down the volume
+ Turn down the volume.
- :param delta: Volume down delta (default: -10%)
- :type delta: int
+ :param delta: Volume down delta (default: -10%).
"""
-
- volume = int(self.status().output['volume'])
+ volume = int(self._status()['volume'])
new_volume = max(volume - delta, 0)
return self.setvol(new_volume)
- @action
- def random(self, value=None):
- """
- Set random mode
-
- :param value: If set, set the random state this value (true/false). Default: None (toggle current state)
- :type value: bool
- """
-
+ def _toggle(self, key: str, value: Optional[bool] = None):
if value is None:
- value = int(self.status().output['random'])
- value = 1 if value == 0 else 0
- return self._exec('random', value)
+ value = bool(self._status()[key])
+ return self._exec(key, int(value))
@action
- def consume(self, value=None):
+ def random(self, value: Optional[bool] = None):
"""
- Set consume mode
+ Set random mode.
- :param value: If set, set the consume state this value (true/false). Default: None (toggle current state)
- :type value: bool
+ :param value: If set, set the random state this value (true/false).
+ Default: None (toggle current state).
"""
-
- if value is None:
- value = int(self.status().output['consume'])
- value = 1 if value == 0 else 0
- return self._exec('consume', value)
+ return self._toggle('random', value)
@action
- def single(self, value=None):
+ def consume(self, value: Optional[bool] = None):
"""
- Set single mode
+ Set consume mode.
- :param value: If set, set the consume state this value (true/false). Default: None (toggle current state)
- :type value: bool
+ :param value: If set, set the consume state this value (true/false).
+ Default: None (toggle current state)
"""
-
- if value is None:
- value = int(self.status().output['single'])
- value = 1 if value == 0 else 0
- return self._exec('single', value)
+ return self._toggle('consume', value)
@action
- def repeat(self, value=None):
+ def single(self, value: Optional[bool] = None):
"""
- Set repeat mode
+ Set single mode.
- :param value: If set, set the repeat state this value (true/false). Default: None (toggle current state)
- :type value: bool
+ :param value: If set, set the consume state this value (true/false).
+ Default: None (toggle current state)
"""
+ return self._toggle('single', value)
- if value is None:
- value = int(self.status().output['repeat'])
- value = 1 if value == 0 else 0
- return self._exec('repeat', value)
+ @action
+ def repeat(self, value: Optional[bool] = None):
+ """
+ Set repeat mode.
+
+ :param value: If set, set the repeat state this value (true/false).
+ Default: None (toggle current state)
+ """
+ return self._toggle('repeat', value)
@action
def shuffle(self):
"""
- Shuffles the current playlist
+ Shuffles the current playlist.
"""
return self._exec('shuffle')
@action
- def save(self, name):
+ def save(self, name: str):
"""
- Save the current tracklist to a new playlist with the specified name
+ Save the current tracklist to a new playlist with the specified name.
:param name: Name of the playlist
- :type name: str
"""
return self._exec('save', name)
@action
- def add(self, resource, position=None):
+ def add(self, resource: str, *_, position: Optional[int] = None, **__):
"""
- Add a resource (track, album, artist, folder etc.) to the current playlist
+ Add a resource (track, album, artist, folder etc.) to the current
+ playlist.
- :param resource: Resource path or URI
- :type resource: str
-
- :param position: Position where the track(s) will be inserted (default: end of the playlist)
- :type position: int
+ :param resource: Resource path or URI.
+ :param position: Position where the track(s) will be inserted (default:
+ end of the playlist).
"""
if isinstance(resource, list):
@@ -324,7 +299,7 @@ class MusicMpdPlugin(MusicPlugin):
else:
self._exec('addid', r, position)
except Exception as e:
- self.logger.warning('Could not add {}: {}'.format(r, e))
+ self.logger.warning('Could not add %s: %s', r, e)
return self.status().output
@@ -361,7 +336,7 @@ class MusicMpdPlugin(MusicPlugin):
if isinstance(playlist, str):
playlist = [playlist]
elif not isinstance(playlist, list):
- raise RuntimeError('Invalid type for playlist: {}'.format(type(playlist)))
+ raise RuntimeError(f'Invalid type for playlist: {type(playlist)}')
for p in playlist:
self._exec('rm', p)
@@ -382,11 +357,11 @@ class MusicMpdPlugin(MusicPlugin):
@classmethod
def _parse_resource(cls, resource):
if not resource:
- return
+ return None
m = re.search(r'^https?://open\.spotify\.com/([^?]+)', resource)
if m:
- resource = 'spotify:{}'.format(m.group(1).replace('/', ':'))
+ resource = 'spotify:' + m.group(1).replace('/', ':')
if resource.startswith('spotify:'):
resource = resource.split('?')[0]
@@ -415,46 +390,59 @@ class MusicMpdPlugin(MusicPlugin):
return ret
@action
- def clear(self):
+ def clear(self, *_, **__):
"""Clear the current playlist"""
return self._exec('clear')
@action
- def seekcur(self, value):
+ def seekcur(self, value: float):
"""
Seek to the specified position (DEPRECATED, use :meth:`.seek` instead).
- :param value: Seek position in seconds, or delta string (e.g. '+15' or '-15') to indicate a seek relative to
- the current position :type value: int
+ :param value: Seek position in seconds, or delta string (e.g. '+15' or
+ '-15') to indicate a seek relative to the current position
"""
-
return self.seek(value)
@action
- def seek(self, position):
+ def seek(self, position: float, *_, **__):
"""
Seek to the specified position
- :param position: Seek position in seconds, or delta string (e.g. '+15' or '-15') to indicate a seek relative
- to the current position :type position: int
+ :param position: Seek position in seconds, or delta string (e.g. '+15'
+ or '-15') to indicate a seek relative to the current position
"""
-
return self._exec('seekcur', position)
@action
def forward(self):
"""Go forward by 15 seconds"""
-
return self._exec('seekcur', '+15')
@action
def back(self):
"""Go backward by 15 seconds"""
-
return self._exec('seekcur', '-15')
+ def _status(self) -> dict:
+ n_tries = 2
+ error = None
+
+ while n_tries > 0:
+ try:
+ n_tries -= 1
+ self._connect()
+ if self.client:
+ return self.client.status() # type: ignore
+ except Exception as e:
+ error = e
+ self.logger.warning('Exception while getting MPD status: %s', e)
+ self.client = None
+
+ raise AssertionError(str(error))
+
@action
- def status(self):
+ def status(self, *_, **__):
"""
:returns: The current state.
@@ -480,24 +468,7 @@ class MusicMpdPlugin(MusicPlugin):
}
"""
-
- n_tries = 2
- error = None
-
- while n_tries > 0:
- try:
- n_tries -= 1
- self._connect()
- if self.client:
- return self.client.status()
- except Exception as e:
- error = e
- self.logger.warning(
- 'Exception while getting MPD status: {}'.format(str(e))
- )
- self.client = None
-
- return None, error
+ return self._status()
@action
def currentsong(self):
@@ -506,9 +477,8 @@ class MusicMpdPlugin(MusicPlugin):
"""
return self.current_track()
- # noinspection PyTypeChecker
@action
- def current_track(self):
+ def current_track(self, *_, **__):
"""
:returns: The currently played track.
@@ -530,6 +500,9 @@ class MusicMpdPlugin(MusicPlugin):
"""
track = self._exec('currentsong', return_status=False)
+ if not isinstance(track, dict):
+ return None
+
if 'title' in track and (
'artist' not in track
or not track['artist']
@@ -583,7 +556,7 @@ class MusicMpdPlugin(MusicPlugin):
return self._exec('playlistinfo', return_status=False)
@action
- def get_playlists(self):
+ def get_playlists(self, *_, **__):
"""
:returns: The playlists available on the server as a list of dicts.
@@ -602,11 +575,12 @@ class MusicMpdPlugin(MusicPlugin):
# ...
}
]
+
"""
- return sorted(
- self._exec('listplaylists', return_status=False),
- key=lambda p: p['playlist'],
+ playlists: list = self._exec( # type: ignore
+ 'listplaylists', return_status=False
)
+ return sorted(playlists, key=lambda p: p['playlist'])
@action
def listplaylists(self):
@@ -616,14 +590,13 @@ class MusicMpdPlugin(MusicPlugin):
return self.get_playlists()
@action
- def get_playlist(self, playlist, with_tracks=False):
+ def get_playlist(self, playlist: str, *_, with_tracks: bool = False, **__):
"""
List the items in the specified playlist.
:param playlist: Name of the playlist
- :type playlist: str
- :param with_tracks: If True then the list of tracks in the playlist will be returned as well (default: False).
- :type with_tracks: bool
+ :param with_tracks: If True then the list of tracks in the playlist will
+ be returned as well (default: False).
"""
return self._exec(
'listplaylistinfo' if with_tracks else 'listplaylist',
@@ -632,29 +605,26 @@ class MusicMpdPlugin(MusicPlugin):
)
@action
- def listplaylist(self, name):
+ def listplaylist(self, name: str):
"""
Deprecated alias for :meth:`.playlist`.
"""
return self._exec('listplaylist', name, return_status=False)
@action
- def listplaylistinfo(self, name):
+ def listplaylistinfo(self, name: str):
"""
- Deprecated alias for :meth:`.playlist` with `with_tracks=True`.
+ Deprecated alias for :meth:`.playlist` with ``with_tracks=True``.
"""
return self.get_playlist(name, with_tracks=True)
@action
- def add_to_playlist(self, playlist, resources):
+ def add_to_playlist(self, playlist: str, resources: Union[str, Collection[str]]):
"""
Add one or multiple resources to a playlist.
:param playlist: Playlist name
- :type playlist: str
-
:param resources: URI or path of the resource(s) to be added
- :type resources: str or list[str]
"""
if isinstance(resources, str):
@@ -664,22 +634,21 @@ class MusicMpdPlugin(MusicPlugin):
self._exec('playlistadd', playlist, res)
@action
- def playlistadd(self, name, uri):
+ def playlistadd(self, name: str, uri: str):
"""
Deprecated alias for :meth:`.add_to_playlist`.
"""
return self.add_to_playlist(name, uri)
@action
- def remove_from_playlist(self, playlist, resources):
+ def remove_from_playlist(
+ self, playlist: str, resources: Union[int, Collection[int]], *_, **__
+ ):
"""
Remove one or multiple tracks from a playlist.
:param playlist: Playlist name
- :type playlist: str
-
:param resources: Position or list of positions to remove
- :type resources: int or list[int]
"""
if isinstance(resources, str):
@@ -691,62 +660,53 @@ class MusicMpdPlugin(MusicPlugin):
self._exec('playlistdelete', playlist, p)
@action
- def playlist_move(self, playlist, from_pos, to_pos):
+ def playlist_move(self, playlist: str, from_pos: int, to_pos: int, *_, **__):
"""
Change the position of a track in the specified playlist.
:param playlist: Playlist name
- :type playlist: str
-
:param from_pos: Original track position
- :type from_pos: int
-
:param to_pos: New track position
- :type to_pos: int
"""
self._exec('playlistmove', playlist, from_pos, to_pos)
@action
- def playlistdelete(self, name, pos):
+ def playlistdelete(self, name: str, pos: int):
"""
Deprecated alias for :meth:`.remove_from_playlist`.
"""
return self.remove_from_playlist(name, pos)
@action
- def playlistmove(self, name, from_pos, to_pos):
+ def playlistmove(self, name: str, from_pos: int, to_pos: int):
"""
Deprecated alias for :meth:`.playlist_move`.
"""
return self.playlist_move(name, from_pos=from_pos, to_pos=to_pos)
@action
- def playlistclear(self, name):
+ def playlistclear(self, name: str):
"""
- Clears all the elements from the specified playlist
+ Clears all the elements from the specified playlist.
- :param name: Playlist name
- :type name: str
+ :param name: Playlist name.
"""
self._exec('playlistclear', name)
@action
- def rename(self, name, new_name):
+ def rename(self, name: str, new_name: str):
"""
- Rename a playlist
+ Rename a playlist.
:param name: Original playlist name
- :type name: str
-
:param new_name: New playlist name
- :type name: str
"""
self._exec('rename', name, new_name)
@action
- def lsinfo(self, uri=None):
+ def lsinfo(self, uri: Optional[str] = None):
"""
- Returns the list of playlists and directories on the server
+ Returns the list of playlists and directories on the server.
"""
return (
@@ -756,41 +716,36 @@ class MusicMpdPlugin(MusicPlugin):
)
@action
- def plchanges(self, version):
+ def plchanges(self, version: int):
"""
Show what has changed on the current playlist since a specified playlist
version number.
:param version: Version number
- :type version: int
-
:returns: A list of dicts representing the songs being added since the specified version
"""
-
return self._exec('plchanges', version, return_status=False)
@action
- def searchaddplaylist(self, name):
+ def searchaddplaylist(self, name: str):
"""
- Search and add a playlist by (partial or full) name
+ Search and add a playlist by (partial or full) name.
- :param name: Playlist name, can be partial
- :type name: str
+ :param name: Playlist name, can be partial.
"""
+ resp: list = self._exec('listplaylists', return_status=False) # type: ignore
playlists = [
- pl['playlist']
- for pl in filter(
- lambda playlist: name.lower() in playlist['playlist'].lower(),
- self._exec('listplaylists', return_status=False),
- )
+ pl['playlist'] for pl in resp if name.lower() in pl['playlist'].lower()
]
- if len(playlists):
- self._exec('clear')
- self._exec('load', playlists[0])
- self._exec('play')
- return {'playlist': playlists[0]}
+ if not playlists:
+ return None
+
+ self._exec('clear')
+ self._exec('load', playlists[0])
+ self._exec('play')
+ return {'playlist': playlists[0]}
@staticmethod
def _make_filter(f: dict) -> list:
@@ -799,40 +754,37 @@ class MusicMpdPlugin(MusicPlugin):
ll.extend([k, v])
return ll
- # noinspection PyShadowingBuiltins
@action
- def find(self, filter: dict, *args, **kwargs):
+ def find(self, filter: dict, *args, **kwargs): # pylint: disable=redefined-builtin
"""
Find in the database/library by filter.
:param filter: Search filter (e.g. ``{"artist": "Led Zeppelin", "album": "IV"}``)
:returns: list[dict]
"""
+ filter_list = self._make_filter(filter)
+ return self._exec('find', *filter_list, *args, return_status=False, **kwargs)
- filter = self._make_filter(filter)
- return self._exec('find', *filter, *args, return_status=False, **kwargs)
-
- # noinspection PyShadowingBuiltins
@action
- def findadd(self, filter: dict, *args, **kwargs):
+ def findadd(
+ self, filter: dict, *args, **kwargs # pylint: disable=redefined-builtin
+ ):
"""
Find in the database/library by filter and add to the current playlist.
:param filter: Search filter (e.g. ``{"artist": "Led Zeppelin", "album": "IV"}``)
:returns: list[dict]
"""
+ filter_list = self._make_filter(filter)
+ return self._exec('findadd', *filter_list, *args, return_status=False, **kwargs)
- filter = self._make_filter(filter)
- return self._exec('findadd', *filter, *args, return_status=False, **kwargs)
-
- # noinspection PyShadowingBuiltins
@action
def search(
self,
- query: Optional[Union[str, dict]] = None,
- filter: Optional[dict] = None,
*args,
- **kwargs
+ query: Optional[Union[str, dict]] = None,
+ filter: Optional[dict] = None, # pylint: disable=redefined-builtin
+ **kwargs,
):
"""
Free search by filter.
@@ -842,26 +794,37 @@ class MusicMpdPlugin(MusicPlugin):
``query``, it's still here for back-compatibility reasons.
:returns: list[dict]
"""
- filter = self._make_filter(query or filter)
- items = self._exec('search', *filter, *args, return_status=False, **kwargs)
+ assert query or filter, 'Specify either `query` or `filter`'
+
+ filt = filter
+ if isinstance(query, str):
+ filt = query
+ elif isinstance(query, dict):
+ filt = {**(filter or {}), **query}
+
+ filter_list = self._make_filter(filt) if isinstance(filt, dict) else [query]
+
+ items: list = self._exec( # type: ignore
+ 'search', *filter_list, *args, return_status=False, **kwargs
+ )
# Spotify results first
return sorted(
items, key=lambda item: 0 if item['file'].startswith('spotify:') else 1
)
- # noinspection PyShadowingBuiltins
@action
- def searchadd(self, filter, *args, **kwargs):
+ def searchadd(self, filter: dict, *args, **kwargs):
"""
Free search by filter and add the results to the current playlist.
:param filter: Search filter (e.g. ``{"artist": "Led Zeppelin", "album": "IV"}``)
:returns: list[dict]
"""
-
- filter = self._make_filter(filter)
- return self._exec('searchadd', *filter, *args, return_status=False, **kwargs)
+ filter_list = self._make_filter(filter)
+ return self._exec(
+ 'searchadd', *filter_list, *args, return_status=False, **kwargs
+ )
# vim:sw=4:ts=4:et:
diff --git a/platypush/plugins/music/snapcast/__init__.py b/platypush/plugins/music/snapcast/__init__.py
index b1cd3bfcb..6ee54e9dd 100644
--- a/platypush/plugins/music/snapcast/__init__.py
+++ b/platypush/plugins/music/snapcast/__init__.py
@@ -1,6 +1,7 @@
import json
import socket
import threading
+from typing import Collection, Optional
from platypush.config import Config
from platypush.context import get_backend
@@ -9,7 +10,7 @@ from platypush.plugins import Plugin, action
class MusicSnapcastPlugin(Plugin):
"""
- Plugin to interact with a [Snapcast](https://github.com/badaix/snapcast)
+ Plugin to interact with a `Snapcast `_
instance, control clients mute status, volume, playback etc.
See https://github.com/badaix/snapcast/blob/master/doc/json_rpc_api/v2_0_0.md
@@ -19,15 +20,13 @@ class MusicSnapcastPlugin(Plugin):
_DEFAULT_SNAPCAST_PORT = 1705
_SOCKET_EOL = '\r\n'.encode()
- def __init__(self, host='localhost', port=_DEFAULT_SNAPCAST_PORT, **kwargs):
+ def __init__(
+ self, host: str = 'localhost', port: int = _DEFAULT_SNAPCAST_PORT, **kwargs
+ ):
"""
:param host: Default Snapcast server host (default: localhost)
- :type host: str
-
:param port: Default Snapcast server control port (default: 1705)
- :type port: int
"""
-
super().__init__(**kwargs)
self.host = host
@@ -46,23 +45,24 @@ class MusicSnapcastPlugin(Plugin):
self._latest_req_id += 1
return self._latest_req_id
- def _connect(self, host=None, port=None):
+ def _connect(self, host: Optional[str] = None, port: Optional[int] = None):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.logger.info('Connecting to Snapcast host {}:{}'.format(host, port))
+ self.logger.info('Connecting to Snapcast host %s:%d', host, port)
sock.connect((host or self.host, port or self.port))
return sock
@classmethod
- def _send(cls, sock, req):
+ def _send(cls, sock: socket.socket, req: dict):
if isinstance(req, dict):
- req = json.dumps(req)
+ r = json.dumps(req)
if isinstance(req, str):
- req = req.encode()
- if not isinstance(req, bytes):
- raise RuntimeError('Unsupported type {} for Snapcast request: {}'.
- format(type(req), req))
+ r = req.encode()
+ if not isinstance(r, bytes):
+ raise RuntimeError(
+ f'Unsupported type {type(req)} for Snapcast request: {req}'
+ )
- sock.send(req + cls._SOCKET_EOL)
+ sock.send(r + cls._SOCKET_EOL)
@classmethod
def _recv(cls, sock):
@@ -71,133 +71,137 @@ class MusicSnapcastPlugin(Plugin):
buf += sock.recv(1)
return json.loads(buf.decode().strip()).get('result')
- def _get_group(self, sock, group):
+ def _get_group(self, sock: socket.socket, group: str):
for g in self._status(sock).get('groups', []):
if group == g.get('id') or group == g.get('name'):
return g
- def _get_client(self, sock, client):
+ return None
+
+ def _get_client(self, sock: socket.socket, client: str):
for g in self._status(sock).get('groups', []):
clients = g.get('clients', [])
for c in clients:
- if client == c.get('id') or \
- client == c.get('name') or \
- client == c.get('host', {}).get('name') or \
- client == c.get('host', {}).get('ip'):
+ if (
+ client == c.get('id')
+ or client == c.get('name')
+ or client == c.get('host', {}).get('name')
+ or client == c.get('host', {}).get('ip')
+ ):
c['group_id'] = g.get('id')
return c
- def _status(self, sock):
+ return None
+
+ def _status(self, sock: socket.socket):
request = {
'id': self._get_req_id(),
'jsonrpc': '2.0',
- 'method': 'Server.GetStatus'
+ 'method': 'Server.GetStatus',
}
- # noinspection PyTypeChecker
self._send(sock, request)
return (self._recv(sock) or {}).get('server', {})
@action
- def status(self, host=None, port=None, client=None, group=None):
+ def status(
+ self,
+ host: Optional[str] = None,
+ port: Optional[int] = None,
+ client: Optional[str] = None,
+ group: Optional[str] = None,
+ ):
"""
Get the status either of a Snapcast server, client or group
:param host: Snapcast server to query (default: default configured host)
- :type host: str
-
:param port: Snapcast server port (default: default configured port)
- :type port: int
-
:param client: Client ID or name (default: None)
- :type client: str
-
:param group: Group ID or name (default: None)
- :type group: str
- :returns: dict.
+ :returns: dict. Example:
- Example::
+ .. code-block:: json
- "output": {
- "groups": [
- {
- "clients": [
+ "output": {
+ "groups": [
{
- "config": {
- "instance": 1,
- "latency": 0,
- "name": "",
- "volume": {
- "muted": false,
- "percent": 96
+ "clients": [
+ {
+ "config": {
+ "instance": 1,
+ "latency": 0,
+ "name": "",
+ "volume": {
+ "muted": false,
+ "percent": 96
+ }
+ },
+ "connected": true,
+ "host": {
+ "arch": "x86_64",
+ "ip": "YOUR_IP",
+ "mac": "YOUR_MAC",
+ "name": "YOUR_NAME",
+ "os": "YOUR_OS"
+ },
+ "id": "YOUR_ID",
+ "lastSeen": {
+ "sec": 1546648311,
+ "usec": 86011
+ },
+ "snapclient": {
+ "name": "Snapclient",
+ "protocolVersion": 2,
+ "version": "0.15.0"
}
- },
- "connected": true,
- "host": {
- "arch": "x86_64",
- "ip": "YOUR_IP",
- "mac": "YOUR_MAC",
- "name": "YOUR_NAME",
- "os": "YOUR_OS"
- },
+ }
+ ],
"id": "YOUR_ID",
- "lastSeen": {
- "sec": 1546648311,
- "usec": 86011
- },
- "snapclient": {
- "name": "Snapclient",
- "protocolVersion": 2,
- "version": "0.15.0"
- }
+ "muted": false,
+ "name": "",
+ "stream_id": "mopidy"
}
],
- "id": "YOUR_ID",
- "muted": false,
- "name": "",
- "stream_id": "mopidy"
- }
- ],
- "server": {
- "host": {
- "arch": "armv7l",
- "ip": "",
- "mac": "",
- "name": "YOUR_NAME",
- "os": "YOUR_OS"
- },
- "snapserver": {
- "controlProtocolVersion": 1,
- "name": "Snapserver",
- "protocolVersion": 1,
- "version": "0.15.0"
- }
- },
- "streams": [
- {
- "id": "mopidy",
- "meta": {
- "STREAM": "mopidy"
+ "server": {
+ "host": {
+ "arch": "armv7l",
+ "ip": "",
+ "mac": "",
+ "name": "YOUR_NAME",
+ "os": "YOUR_OS"
},
- "status": "playing",
- "uri": {
- "fragment": "",
- "host": "",
- "path": "/tmp/snapfifo",
- "query": {
- "buffer_ms": "20",
- "codec": "pcm",
- "name": "mopidy",
- "sampleformat": "48000:16:2"
- },
- "raw": "pipe:////tmp/snapfifo?buffer_ms=20&codec=pcm&name=mopidy&sampleformat=48000:16:2",
- "scheme": "pipe"
+ "snapserver": {
+ "controlProtocolVersion": 1,
+ "name": "Snapserver",
+ "protocolVersion": 1,
+ "version": "0.15.0"
}
- }
- ]
- }
+ },
+ "streams": [
+ {
+ "id": "mopidy",
+ "meta": {
+ "STREAM": "mopidy"
+ },
+ "status": "playing",
+ "uri": {
+ "fragment": "",
+ "host": "",
+ "path": "/tmp/snapfifo",
+ "query": {
+ "buffer_ms": "20",
+ "codec": "pcm",
+ "name": "mopidy",
+ "sampleformat": "48000:16:2"
+ },
+ "raw": "pipe:////tmp/fifo?buffer_ms=20&codec=pcm&name=mopidy&sampleformat=48000:16:2",
+ "scheme": "pipe"
+ }
+ }
+ ]
+ }
"""
@@ -213,33 +217,32 @@ class MusicSnapcastPlugin(Plugin):
return self._status(sock)
finally:
try:
- sock.close()
+ if sock:
+ sock.close()
except Exception as e:
- self.logger.warning(f'Error on socket close: {e}')
+ self.logger.warning('Error on socket close: %s', e)
@action
- def mute(self, client=None, group=None, mute=None, host=None, port=None):
+ def mute(
+ self,
+ client: Optional[str] = None,
+ group: Optional[str] = None,
+ mute: Optional[bool] = None,
+ host: Optional[str] = None,
+ port: Optional[int] = None,
+ ):
"""
Set the mute status of a connected client or group
:param client: Client name or ID to mute
- :type client: str
-
:param group: Group ID to mute
- :type group: str
-
:param mute: Mute status. If not set, the mute status of the selected
client/group will be toggled.
- :type mute: bool
-
:param host: Snapcast server to query (default: default configured host)
- :type host: str
-
:param port: Snapcast server port (default: default configured port)
- :type port: int
"""
- if not client and not group:
+ if not (client and group):
raise RuntimeError('Please specify either a client or a group')
sock = None
@@ -250,59 +253,62 @@ class MusicSnapcastPlugin(Plugin):
'id': self._get_req_id(),
'jsonrpc': '2.0',
'method': 'Group.SetMute' if group else 'Client.SetVolume',
- 'params': {}
+ 'params': {},
}
if group:
- group = self._get_group(sock, group)
- cur_muted = group['muted']
- request['params']['id'] = group['id']
+ g = self._get_group(sock, group)
+ assert g, f'No such group: {group}'
+ cur_muted = g['muted']
+ request['params']['id'] = g['id']
request['params']['mute'] = not cur_muted if mute is None else mute
- else:
- client = self._get_client(sock, client)
- cur_muted = client['config']['volume']['muted']
- request['params']['id'] = client['id']
+ elif client:
+ c = self._get_client(sock, client)
+ assert c, f'No such client: {client}'
+ cur_muted = c['config']['volume']['muted']
+ request['params']['id'] = c['id']
request['params']['volume'] = {}
- request['params']['volume']['percent'] = client['config']['volume']['percent']
- request['params']['volume']['muted'] = not cur_muted if mute is None else mute
+ request['params']['volume']['percent'] = c['config']['volume'][
+ 'percent'
+ ]
+ request['params']['volume']['muted'] = (
+ not cur_muted if mute is None else mute
+ )
- # noinspection PyTypeChecker
self._send(sock, request)
return self._recv(sock)
finally:
try:
- sock.close()
+ if sock:
+ sock.close()
except Exception as e:
- self.logger.warning('Error on socket close', e)
+ self.logger.warning('Error on socket close: %s', e)
@action
- def volume(self, client, volume=None, delta=None, mute=None, host=None,
- port=None):
+ def volume(
+ self,
+ client: str,
+ volume: Optional[int] = None,
+ delta: Optional[int] = None,
+ mute: Optional[bool] = None,
+ host: Optional[str] = None,
+ port: Optional[int] = None,
+ ):
"""
- Set the volume of a connected client
+ Set the volume of a connected client.
:param client: Client name or ID
- :type client: str
-
:param volume: Absolute volume to set between 0 and 100
- :type volume: int
-
:param delta: Relative volume change in percentage (e.g. +10 or -10)
- :type delta: int
-
:param mute: Set to true or false if you want to toggle the muted status
- :type mute: bool
-
:param host: Snapcast server (default: default configured host)
- :type host: str
-
:param port: Snapcast server port (default: default configured port)
- :type port: int
"""
if volume is None and delta is None and mute is None:
- raise RuntimeError('Please specify either an absolute volume or ' +
- 'relative delta')
+ raise RuntimeError(
+ 'Please specify either an absolute volume or ' + 'relative delta'
+ )
sock = None
@@ -312,56 +318,51 @@ class MusicSnapcastPlugin(Plugin):
'id': self._get_req_id(),
'jsonrpc': '2.0',
'method': 'Client.SetVolume',
- 'params': {}
+ 'params': {},
}
- client = self._get_client(sock, client)
- cur_volume = int(client['config']['volume']['percent'])
- cur_mute = bool(client['config']['volume']['muted'])
+ c = self._get_client(sock, client)
+ assert c, f'No such client: {client}'
+ cur_volume = int(c['config']['volume']['percent'])
+ cur_mute = bool(c['config']['volume']['muted'])
if volume is not None:
volume = int(volume)
elif delta is not None:
volume = cur_volume + int(delta)
- if volume is not None:
- if volume > 100: volume = 100
- if volume < 0: volume = 0
- else:
- volume = cur_volume
-
+ volume = max(0, min(100, volume)) if volume is not None else cur_volume
if mute is None:
mute = cur_mute
- request['params']['id'] = client['id']
+ request['params']['id'] = c['id']
request['params']['volume'] = {}
request['params']['volume']['percent'] = volume
request['params']['volume']['muted'] = mute
- # noinspection PyTypeChecker
self._send(sock, request)
return self._recv(sock)
finally:
try:
- sock.close()
+ if sock:
+ sock.close()
except Exception as e:
- self.logger.warning('Error on socket close', e)
+ self.logger.warning('Error on socket close: %s', e)
@action
- def set_client_name(self, client, name, host=None, port=None):
+ def set_client_name(
+ self,
+ client: str,
+ name: str,
+ host: Optional[str] = None,
+ port: Optional[int] = None,
+ ):
"""
Set/change the name of a connected client
:param client: Current client name or ID to rename
- :type client: str
-
:param name: New name
- :type name: str
-
:param host: Snapcast server (default: default configured host)
- :type host: str
-
:param port: Snapcast server port (default: default configured port)
- :type port: int
"""
sock = None
@@ -372,37 +373,37 @@ class MusicSnapcastPlugin(Plugin):
'id': self._get_req_id(),
'jsonrpc': '2.0',
'method': 'Client.SetName',
- 'params': {}
+ 'params': {},
}
- client = self._get_client(sock, client)
- request['params']['id'] = client['id']
+ c = self._get_client(sock, client)
+ assert c, f'No such client: {client}'
+ request['params']['id'] = c['id']
request['params']['name'] = name
- # noinspection PyTypeChecker
self._send(sock, request)
return self._recv(sock)
finally:
try:
- sock.close()
+ if sock:
+ sock.close()
except Exception as e:
- self.logger.warning('Error on socket close', e)
+ self.logger.warning('Error on socket close: %s', e)
@action
- def set_group_name(self, group, name, host=None, port=None):
+ def set_group_name(
+ self,
+ group: str,
+ name: str,
+ host: Optional[str] = None,
+ port: Optional[int] = None,
+ ):
"""
Set/change the name of a group
:param group: Group ID to rename
- :type group: str
-
:param name: New name
- :type name: str
-
:param host: Snapcast server (default: default configured host)
- :type host: str
-
:param port: Snapcast server port (default: default configured port)
- :type port: int
"""
sock = None
@@ -416,34 +417,33 @@ class MusicSnapcastPlugin(Plugin):
'params': {
'id': group,
'name': name,
- }
+ },
}
- # noinspection PyTypeChecker
self._send(sock, request)
return self._recv(sock)
finally:
try:
- sock.close()
+ if sock:
+ sock.close()
except Exception as e:
- self.logger.warning('Error on socket close', e)
+ self.logger.warning('Error on socket close: %s', e)
@action
- def set_latency(self, client, latency, host=None, port=None):
+ def set_latency(
+ self,
+ client: str,
+ latency: float,
+ host: Optional[str] = None,
+ port: Optional[int] = None,
+ ):
"""
Set/change the latency of a connected client
:param client: Client name or ID
- :type client: str
-
:param latency: New latency in milliseconds
- :type latency: float
-
:param host: Snapcast server (default: default configured host)
- :type host: str
-
:param port: Snapcast server port (default: default configured port)
- :type port: int
"""
sock = None
@@ -454,35 +454,31 @@ class MusicSnapcastPlugin(Plugin):
'id': self._get_req_id(),
'jsonrpc': '2.0',
'method': 'Client.SetLatency',
- 'params': {
- 'latency': latency
- }
+ 'params': {'latency': latency},
}
- client = self._get_client(sock, client)
- request['params']['id'] = client['id']
- # noinspection PyTypeChecker
+ c = self._get_client(sock, client)
+ assert c, f'No such client: {client}'
+ request['params']['id'] = c['id']
self._send(sock, request)
return self._recv(sock)
finally:
try:
- sock.close()
+ if sock:
+ sock.close()
except Exception as e:
- self.logger.warning('Error on socket close', e)
+ self.logger.warning('Error on socket close: %s', e)
@action
- def delete_client(self, client, host=None, port=None):
+ def delete_client(
+ self, client: str, host: Optional[str] = None, port: Optional[int] = None
+ ):
"""
Delete a client from the Snapcast server
:param client: Client name or ID
- :type client: str
-
:param host: Snapcast server (default: default configured host)
- :type host: str
-
:param port: Snapcast server port (default: default configured port)
- :type port: int
"""
sock = None
@@ -493,176 +489,188 @@ class MusicSnapcastPlugin(Plugin):
'id': self._get_req_id(),
'jsonrpc': '2.0',
'method': 'Server.DeleteClient',
- 'params': {}
+ 'params': {},
}
- client = self._get_client(sock, client)
- request['params']['id'] = client['id']
- # noinspection PyTypeChecker
+ c = self._get_client(sock, client)
+ assert c, f'No such client: {client}'
+ request['params']['id'] = c['id']
self._send(sock, request)
return self._recv(sock)
finally:
try:
- sock.close()
+ if sock:
+ sock.close()
except Exception as e:
- self.logger.warning('Error on socket close', e)
+ self.logger.warning('Error on socket close: %s', e)
@action
- def group_set_clients(self, group, clients, host=None, port=None):
+ def group_set_clients(
+ self,
+ group: str,
+ clients: Collection[str],
+ host: Optional[str] = None,
+ port: Optional[int] = None,
+ ):
"""
Sets the clients for a group on a Snapcast server
:param group: Group name or ID
- :type group: str
-
:param clients: List of client names or IDs
- :type clients: list[str]
-
:param host: Snapcast server (default: default configured host)
- :type host: str
-
:param port: Snapcast server port (default: default configured port)
- :type port: int
"""
sock = None
try:
sock = self._connect(host or self.host, port or self.port)
- group = self._get_group(sock, group)
+ g = self._get_group(sock, group)
+ assert g, f'No such group: {group}'
request = {
'id': self._get_req_id(),
'jsonrpc': '2.0',
'method': 'Group.SetClients',
- 'params': {
- 'id': group['id'],
- 'clients': []
- }
+ 'params': {'id': g['id'], 'clients': []},
}
for client in clients:
- client = self._get_client(sock, client)
- request['params']['clients'].append(client['id'])
+ c = self._get_client(sock, client)
+ assert c, f'No such client: {client}'
+ request['params']['clients'].append(c['id'])
- # noinspection PyTypeChecker
self._send(sock, request)
return self._recv(sock)
finally:
try:
- sock.close()
+ if sock:
+ sock.close()
except Exception as e:
- self.logger.warning('Error on socket close', e)
+ self.logger.warning('Error on socket close: %s', e)
@action
- def group_set_stream(self, group, stream_id, host=None, port=None):
+ def group_set_stream(
+ self,
+ group: str,
+ stream_id: str,
+ host: Optional[str] = None,
+ port: Optional[int] = None,
+ ):
"""
Sets the active stream for a group.
:param group: Group name or ID
- :type group: str
-
:param stream_id: Stream ID
- :type stream_id: str
-
:param host: Snapcast server (default: default configured host)
- :type host: str
-
:param port: Snapcast server port (default: default configured port)
- :type port: int
"""
sock = None
try:
sock = self._connect(host or self.host, port or self.port)
- group = self._get_group(sock, group)
+ g = self._get_group(sock, group)
+ assert g, f'No such group: {group}'
request = {
'id': self._get_req_id(),
'jsonrpc': '2.0',
'method': 'Group.SetStream',
'params': {
- 'id': group['id'],
+ 'id': g['id'],
'stream_id': stream_id,
- }
+ },
}
- # noinspection PyTypeChecker
self._send(sock, request)
return self._recv(sock)
finally:
try:
- sock.close()
+ if sock:
+ sock.close()
except Exception as e:
- self.logger.warning('Error on socket close', e)
+ self.logger.warning('Error on socket close: %s', e)
@action
def get_backend_hosts(self):
"""
:return: A dict with the Snapcast hosts configured on the backend
- in the format host -> port
+ in the format ``host -> port``.
"""
- hosts = {}
- for i in range(len(self.backend_hosts)):
- hosts[self.backend_hosts[i]] = self.backend_ports[i]
- return hosts
+ return {
+ host: self.backend_ports[i] for i, host in enumerate(self.backend_hosts)
+ }
@action
- def get_playing_streams(self, exclude_local=False):
+ def get_playing_streams(self, exclude_local: bool = False):
"""
Returns the remote streams configured in the `music.snapcast` backend
that are currently active and unmuted.
:param exclude_local: Exclude localhost connections (default: False)
- :type exclude_local: bool
- :returns: dict with the host->port mapping.
+ :returns: dict with the host->port mapping. Example:
- Example::
+ .. code-block:: json
- {
- "hosts": {
- "server_1": 1705,
- "server_2": 1705,
- "server_3": 1705
+ {
+ "hosts": {
+ "server_1": 1705,
+ "server_2": 1705,
+ "server_3": 1705
+ }
}
- }
"""
- backend_hosts = self.get_backend_hosts().output
+ backend_hosts: dict = self.get_backend_hosts().output # type: ignore
playing_hosts = {}
def _worker(host, port):
try:
- if exclude_local and (host == 'localhost'
- or host == Config.get('device_id')):
+ if exclude_local and (
+ host == 'localhost' or host == Config.get('device_id')
+ ):
return
- server_status = self.status(host=host, port=port).output
- client_status = self.status(host=host, port=port,
- client=Config.get('device_id')).output
+ server_status: dict = self.status(host=host, port=port).output # type: ignore
+ client_status: dict = self.status( # type: ignore
+ host=host, port=port, client=Config.get('device_id')
+ ).output
if client_status.get('config', {}).get('volume', {}).get('muted'):
return
- group = [g for g in server_status.get('groups', {})
- if g.get('id') == client_status.get('group_id')].pop(0)
+ group = next(
+ iter(
+ g
+ for g in server_status.get('groups', {})
+ if g.get('id') == client_status.get('group_id')
+ )
+ )
if group.get('muted'):
return
- stream = [s for s in server_status.get('streams')
- if s.get('id') == group.get('stream_id')].pop(0)
+ stream = next(
+ iter(
+ s
+ for s in server_status.get('streams', {})
+ if s.get('id') == group.get('stream_id')
+ )
+ )
if stream.get('status') != 'playing':
return
playing_hosts[host] = port
except Exception as e:
- self.logger.warning(('Error while retrieving the status of ' +
- 'Snapcast host at {}:{}: {}').format(
- host, port, str(e)))
+ self.logger.warning(
+ 'Error while retrieving the status of Snapcast host at %s:%d: %s',
+ host,
+ port,
+ e,
+ )
workers = []
@@ -677,4 +685,5 @@ class MusicSnapcastPlugin(Plugin):
return {'hosts': playing_hosts}
+
# vim:sw=4:ts=4:et:
diff --git a/platypush/plugins/zwave/mqtt/__init__.py b/platypush/plugins/zwave/mqtt/__init__.py
index 38c367a60..5846e5ab4 100644
--- a/platypush/plugins/zwave/mqtt/__init__.py
+++ b/platypush/plugins/zwave/mqtt/__init__.py
@@ -79,10 +79,6 @@ class ZwaveMqttPlugin(
This plugin allows you to manage a Z-Wave network over MQTT through
`zwave-js-ui `_.
- For historical reasons, it is advised to enable this plugin together
- with the ``zwave.mqtt`` backend, or you may lose the ability to listen
- to asynchronous events.
-
Configuration required on the zwave-js-ui gateway:
* Install the gateway following the instructions reported