Compare commits

..

4 commits

Author SHA1 Message Date
3086dd86fc
LINT+Black+stability fixes for some plugins that hadn't been touched in a while.
All checks were successful
continuous-integration/drone/push Build is passing
- media.mplayer
- media.omxplayer
- media.vlc
- music.mpd
- music.snapcast
2023-10-01 22:55:06 +02:00
2aefc4e5c8
Several improvements for the Google integrations.
1. Improved documentation. Every plugin now reports the exact steps to
   get the integration up and running with the right API scopes.

2. All Google plugins now have a standard process to get (and reuse) the
   client secret. Except for PubSub, Translate and Maps (which have
   their own flows), all the Google plugins now read the client secrets
   from `<WORKDIR>/credentials/google/client_secret.json` by default.

3. Black/LINT for some of those plugins, which hadn't been touched in a
   while.

4. The interface to pass API scopes is now leaner. It's now possible to
   pass a scope directly as e.g. `calendar.readonly` rather than
   `https://www.googleapis.com/auth/calendar.readonly`.

5. Improved the logic to retrieve the right scope tokens file. If e.g.
   an integration requires the role `A`, and a credentials file exists
   for the roles `A` and `B`, then this file will be used rather than
   prompting the user to authenticate again.
2023-10-01 15:37:20 +02:00
5ca3757834
A more readable configuration for the calendar plugin.
The old type configuration
(`platypush.plugins.calendar.name.CalendarNamePlugin`) is a bit clunky.

Instead, since the type will always be a plugin, we should encourage
the use of `calendar.name` directly to identify the type.
2023-10-01 01:09:15 +02:00
966a6ce29e
httplib2 should be an explicit dependency for Google integrations.
Plus, some misc LINT/Black chores.
2023-10-01 00:52:59 +02:00
30 changed files with 1320 additions and 925 deletions

View file

@ -301,11 +301,11 @@ backend.http:
# # Installing the dependencies: pip install 'platypush[ical,google]'
# calendar:
# calendars:
# - type: platypush.plugins.google.calendar.GoogleCalendarPlugin
# - type: platypush.plugins.calendar.ical.CalendarIcalPlugin
# - type: google.calendar
# - type: calendar.ical
# url: https://www.facebook.com/events/ical/upcoming/?uid=your_user_id&key=your_key
# - type: platypush.plugins.calendar.ical.CalendarIcalPlugin
# url: http://riemann/nextcloud/remote.php/dav/public-calendars/9JBWHR7iioM88Y4D?export
# - type: calendar.ical
# url: https://my.nextcloud.org/remote.php/dav/public-calendars/id?export
###
###

View file

@ -5,6 +5,7 @@ import importlib
from abc import ABCMeta, abstractmethod
from platypush.context import get_plugin
from platypush.plugins import Plugin, action
@ -32,10 +33,10 @@ class CalendarPlugin(Plugin, CalendarInterface):
calendars:
# Use the Google Calendar integration
- type: platypush.plugins.google.calendar.GoogleCalendarPlugin
- type: google.calendar
# Import the Facebook events calendar via iCal URL
- type: platypush.plugins.calendar.ical.IcalCalendarPlugin
- type: calendar.ical
url: https://www.facebook.com/ical/u.php?uid=USER_ID&key=FB_KEY
"""
@ -44,17 +45,24 @@ class CalendarPlugin(Plugin, CalendarInterface):
self.calendars = []
for calendar in calendars:
if 'type' not in calendar:
cal_type = calendar.pop('type', None)
if cal_type is None:
self.logger.warning(
"Invalid calendar with no type specified: {}".format(calendar)
"Invalid calendar with no type specified: %s", calendar
)
continue
cal_type = calendar.pop('type')
try:
# New `calendar.name` format
cal_plugin = get_plugin(cal_type).__class__
except Exception:
# Legacy `platypush.plugins.calendar.name.CalendarNamePlugin` format
module_name = '.'.join(cal_type.split('.')[:-1])
class_name = cal_type.split('.')[-1]
module = importlib.import_module(module_name)
self.calendars.append(getattr(module, class_name)(**calendar))
cal_plugin = getattr(module, class_name)
self.calendars.append(cal_plugin(**calendar))
@action
def get_upcoming_events(self, max_results=10):
@ -105,7 +113,9 @@ class CalendarPlugin(Plugin, CalendarInterface):
cal_events = calendar.get_upcoming_events().output or []
events.extend(cal_events)
except Exception as e:
self.logger.warning('Could not retrieve events: {}'.format(str(e)))
self.logger.warning(
'Could not retrieve events from calendar %s: %s', calendar, e
)
events = sorted(
events,

View file

@ -25,7 +25,7 @@ class DbPlugin(Plugin):
_db_error_wait_interval = 5.0
_db_error_retries = 3
def __init__(self, engine=None, **kwargs):
def __init__(self, engine=None, *args, **kwargs):
"""
:param engine: Default SQLAlchemy connection engine string (e.g.
``sqlite:///:memory:`` or ``mysql://user:pass@localhost/test``)
@ -42,7 +42,7 @@ class DbPlugin(Plugin):
super().__init__()
self.engine_url = engine
self.engine = self.get_engine(engine, **kwargs)
self.engine = self.get_engine(engine, *args, **kwargs)
def get_engine(
self, engine: Optional[Union[str, Engine]] = None, *args, **kwargs

View file

@ -15,7 +15,8 @@ class FoursquarePlugin(Plugin):
- Copy the ``client_id`` and ``client_secret``.
- Add a redirect URL. It must point to a valid IP/hostname with a web server running, even if it runs
locally. You can also use the local URL of the platypush web server - e.g. http://192.168.1.2:8008/.
- Open the following URL: ``https://foursquare.com/oauth2/authenticate?client_id=CLIENT_ID&response_type=token&redirect_uri=REDIRECT_URI``.
- Open the following URL:
``https://foursquare.com/oauth2/authenticate?client_id=CLIENT_ID&response_type=token&redirect_uri=REDIRECT_URI``.
Replace ``CLIENT_ID`` and ``REDIRECT_URI`` with the parameters from your app.
- Allow the application. You will be redirected to the URL you provided. Copy the ``access_token`` provided in
the URL.
@ -26,14 +27,16 @@ class FoursquarePlugin(Plugin):
def __init__(self, access_token: str, **kwargs):
"""
:param access_token:
:param access_token: The access token to use to authenticate to the Foursquare API.
"""
super().__init__(**kwargs)
self.access_token = access_token
def _get_url(self, endpoint):
return '{url}/{endpoint}?oauth_token={token}&v={version}'.format(
url=self.api_base_url, endpoint=endpoint, token=self.access_token,
url=self.api_base_url,
endpoint=endpoint,
token=self.access_token,
version=datetime.date.today().strftime('%Y%m%d'),
)
@ -44,11 +47,18 @@ class FoursquarePlugin(Plugin):
:return: A list of checkins, as returned by the Foursquare API.
"""
url = self._get_url('users/self/checkins')
return requests.get(url).json().get('response', {}).get('checkins', {}).get('items', [])
return (
requests.get(url)
.json()
.get('response', {})
.get('checkins', {})
.get('items', [])
)
# noinspection DuplicatedCode
@action
def search(self,
def search(
self,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
altitude: Optional[float] = None,
@ -61,7 +71,8 @@ class FoursquarePlugin(Plugin):
categories: Optional[List[str]] = None,
radius: Optional[int] = None,
sw: Optional[Union[Tuple[float], List[float]]] = None,
ne: Optional[Union[Tuple[float], List[float]]] = None,) -> List[Dict[str, Any]]:
ne: Optional[Union[Tuple[float], List[float]]] = None,
) -> List[Dict[str, Any]]:
"""
Search for venues.
@ -82,7 +93,9 @@ class FoursquarePlugin(Plugin):
:param ne: North/east boundary box as a ``[latitude, longitude]`` pair.
:return: A list of venues, as returned by the Foursquare API.
"""
assert (latitude and longitude) or near, 'Specify either latitude/longitude or near'
assert (
latitude and longitude
) or near, 'Specify either latitude/longitude or near'
args = {}
if latitude and longitude:
@ -111,11 +124,14 @@ class FoursquarePlugin(Plugin):
args['ne'] = ne
url = self._get_url('venues/search')
return requests.get(url, params=args).json().get('response', {}).get('venues', [])
return (
requests.get(url, params=args).json().get('response', {}).get('venues', [])
)
# noinspection DuplicatedCode
@action
def explore(self,
def explore(
self,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
altitude: Optional[float] = None,
@ -131,7 +147,8 @@ class FoursquarePlugin(Plugin):
sort_by_distance: Optional[bool] = None,
sort_by_popularity: Optional[bool] = None,
price: Optional[List[int]] = None,
saved: Optional[bool] = None) -> List[Dict[str, Any]]:
saved: Optional[bool] = None,
) -> List[Dict[str, Any]]:
"""
Explore venues around a location.
@ -168,7 +185,9 @@ class FoursquarePlugin(Plugin):
:return: A list of venues, as returned by the Foursquare API.
"""
assert (latitude and longitude) or near, 'Specify either latitude/longitude or near'
assert (
latitude and longitude
) or near, 'Specify either latitude/longitude or near'
args = {}
if latitude and longitude:
@ -203,15 +222,19 @@ class FoursquarePlugin(Plugin):
args['price'] = ','.join([str(p) for p in price])
url = self._get_url('venues/explore')
return requests.get(url, params=args).json().get('response', {}).get('venues', [])
return (
requests.get(url, params=args).json().get('response', {}).get('venues', [])
)
@action
def trending(self,
def trending(
self,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
near: Optional[str] = None,
limit: Optional[int] = None,
radius: Optional[int] = None) -> List[Dict[str, Any]]:
radius: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""
Get the trending venues around a location.
@ -224,7 +247,9 @@ class FoursquarePlugin(Plugin):
:return: A list of venues, as returned by the Foursquare API.
"""
assert (latitude and longitude) or near, 'Specify either latitude/longitude or near'
assert (
latitude and longitude
) or near, 'Specify either latitude/longitude or near'
args = {}
if latitude and longitude:
@ -237,24 +262,29 @@ class FoursquarePlugin(Plugin):
args['radius'] = radius
url = self._get_url('venues/trending')
return requests.get(url, params=args).json().get('response', {}).get('venues', [])
return (
requests.get(url, params=args).json().get('response', {}).get('venues', [])
)
@staticmethod
def _parse_time(t):
if isinstance(t, int) or isinstance(t, float):
if isinstance(t, (int, float)):
return datetime.datetime.fromtimestamp(t)
if isinstance(t, str):
return datetime.datetime.fromisoformat(t)
assert isinstance(t, datetime.datetime), 'Cannot parse object of type {} into datetime: {}'.format(
type(t), t)
assert isinstance(
t, datetime.datetime
), 'Cannot parse object of type {} into datetime: {}'.format(type(t), t)
return t
@action
def time_series(self,
def time_series(
self,
venue_id: Union[str, List[str]],
start_at: Union[int, float, datetime.datetime, str],
end_at: Union[int, float, datetime.datetime, str]) -> List[Dict[str, Any]]:
end_at: Union[int, float, datetime.datetime, str],
) -> List[Dict[str, Any]]:
"""
Get the visitors stats about one or multiple venues over a time range. The user must be a manager of
those venues.
@ -275,13 +305,17 @@ class FoursquarePlugin(Plugin):
}
url = self._get_url('venues/timeseries')
return requests.get(url, params=args).json().get('response', {}).get('venues', [])
return (
requests.get(url, params=args).json().get('response', {}).get('venues', [])
)
@action
def stats(self,
def stats(
self,
venue_id: str,
start_at: Union[int, float, datetime.datetime, str],
end_at: Union[int, float, datetime.datetime, str]) -> List[Dict[str, Any]]:
end_at: Union[int, float, datetime.datetime, str],
) -> List[Dict[str, Any]]:
"""
Get the stats about a venue over a time range. The user must be a manager of that venue.
@ -297,7 +331,9 @@ class FoursquarePlugin(Plugin):
}
url = self._get_url('venues/{}/stats'.format(venue_id))
return requests.get(url, params=args).json().get('response', {}).get('venues', [])
return (
requests.get(url, params=args).json().get('response', {}).get('venues', [])
)
@action
def managed(self) -> List[Dict[str, Any]]:
@ -306,10 +342,17 @@ class FoursquarePlugin(Plugin):
:return: A list of venues, as returned by the Foursquare API.
"""
url = self._get_url('venues/managed')
return requests.get(url).json().get('response', {}).get('venues', []).get('items', [])
return (
requests.get(url)
.json()
.get('response', {})
.get('venues', [])
.get('items', [])
)
@action
def checkin(self,
def checkin(
self,
venue_id: str,
latitude: Optional[float] = None,
longitude: Optional[float] = None,
@ -317,7 +360,8 @@ class FoursquarePlugin(Plugin):
latlng_accuracy: Optional[float] = None,
altitude_accuracy: Optional[float] = None,
shout: Optional[str] = None,
broadcast: Optional[List[str]] = None) -> Dict[str, Any]:
broadcast: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""
Create a new check-in.
@ -350,10 +394,14 @@ class FoursquarePlugin(Plugin):
if shout:
args['shout'] = shout
if broadcast:
args['broadcast'] = ','.join(broadcast) if isinstance(broadcast, list) else broadcast
args['broadcast'] = (
','.join(broadcast) if isinstance(broadcast, list) else broadcast
)
url = self._get_url('checkins/add')
return requests.post(url, data=args).json().get('response', {}).get('checkin', {})
return (
requests.post(url, data=args).json().get('response', {}).get('checkin', {})
)
# vim:sw=4:ts=4:et:

View file

@ -1,56 +1,93 @@
from typing import Collection, Optional
from platypush.plugins import Plugin
class GooglePlugin(Plugin):
"""
Executes calls to the Google APIs using the google-api-python-client.
Integrates with the Google APIs using the google-api-python-client.
This class is extended by ``GoogleMailPlugin``, ``GoogleCalendarPlugin`` etc.
In order to use Google services (like GMail, Maps, Calendar etc.) with
your account you need to:
1. Create your Google application, if you don't have one already, on
the developers console, https://console.developers.google.com
the `developers console <https://console.developers.google.com>`_.
2. Click on "Credentials", then "Create credentials" -> "OAuth client ID"
2. You may have to explicitly enable your user to use the app if the app
is created in test mode. Go to "OAuth consent screen" and add your user's
email address to the list of authorized users.
3 Select "Other", enter whichever description you like, and create
3. Select the scopes that you want to enable for your application, depending
on the integrations that you want to use.
See https://developers.google.com/identity/protocols/oauth2/scopes
for a list of the available scopes.
4. Click on the "Download JSON" icon next to your newly created client ID
4. Click on "Credentials", then "Create credentials" -> "OAuth client ID".
5. Generate a credentials file for the needed scope::
5 Select "Desktop app", enter whichever name you like, and click "Create".
6. Click on the "Download JSON" icon next to your newly created client ID.
7. Generate a credentials file for the required scope:
.. code-block:: bash
mkdir -p <WORKDIR>/credentials/google
python -m platypush.plugins.google.credentials \
'https://www.googleapis.com/auth/gmail.compose' ~/client_secret.json
'calendar.readonly' \
<WORKDIR>/credentials/google/client_secret.json
"""
def __init__(self, scopes=None, **kwargs):
def __init__(
self,
scopes: Optional[Collection[str]] = None,
secrets_path: Optional[str] = None,
**kwargs
):
"""
Initialized the Google plugin with the required scopes.
:param scopes: List of required scopes
:type scopes: list
:param scopes: List of scopes required by the API.
See https://developers.google.com/identity/protocols/oauth2/scopes
for a list of the available scopes. Override it in your configuration
only if you need specific scopes that aren't normally required by the
plugin.
:param secrets_path: Path to the client secrets file.
You can create your secrets.json from https://console.developers.google.com.
Default: ``<PLATYPUSH_WORKDIR>/credentials/google/client_secret.json``.
"""
from platypush.plugins.google.credentials import get_credentials
from platypush.plugins.google.credentials import (
get_credentials,
default_secrets_file,
)
super().__init__(**kwargs)
self._scopes = scopes or []
self._secrets_path: str = secrets_path or default_secrets_file
if self._scopes:
scopes = ' '.join(sorted(self._scopes))
self.credentials = {scopes: get_credentials(scopes)}
scopes = " ".join(sorted(self._scopes))
try:
self.credentials = {
scopes: get_credentials(scopes, secrets_file=self._secrets_path)
}
except AssertionError as e:
self.logger.warning(str(e))
else:
self.credentials = {}
def get_service(self, service, version, scopes=None):
def get_service(
self, service: str, version: str, scopes: Optional[Collection[str]] = None
):
import httplib2
from apiclient import discovery
if scopes is None:
scopes = getattr(self, 'scopes', [])
scopes = getattr(self, "scopes", [])
scopes = ' '.join(sorted(scopes))
scopes = " ".join(sorted(scopes))
credentials = self.credentials[scopes]
http = credentials.authorize(httplib2.Http())
return discovery.build(service, version, http=http, cache_discovery=False)

View file

@ -6,11 +6,41 @@ from platypush.plugins.calendar import CalendarInterface
class GoogleCalendarPlugin(GooglePlugin, CalendarInterface):
"""
r"""
Google Calendar plugin.
In order to use this plugin:
1. Create your Google application, if you don't have one already, on
the `developers console <https://console.developers.google.com>`_.
2. You may have to explicitly enable your user to use the app if the app
is created in test mode. Go to "OAuth consent screen" and add your user's
email address to the list of authorized users.
3. Select the scopes that you want to enable for your application, depending
on the integrations that you want to use.
See https://developers.google.com/identity/protocols/oauth2/scopes
for a list of the available scopes.
4. Click on "Credentials", then "Create credentials" -> "OAuth client ID".
5 Select "Desktop app", enter whichever name you like, and click "Create".
6. Click on the "Download JSON" icon next to your newly created client ID.
7. Generate a credentials file for the required scope:
.. code-block:: bash
mkdir -p <WORKDIR>/credentials/google
python -m platypush.plugins.google.credentials \
'calendar.readonly' \
<WORKDIR>/credentials/google/client_secret.json
"""
scopes = ['https://www.googleapis.com/auth/calendar.readonly']
scopes = ['calendar.readonly']
def __init__(self, *args, **kwargs):
super().__init__(scopes=self.scopes, *args, **kwargs)

View file

@ -5,20 +5,25 @@ manifest:
- py3-google-api-python-client
- py3-google-auth
- py3-oauth2client
- py3-httplib2
apt:
- python3-google-auth
- python3-oauth2client
- python3-httplib2
dnf:
- python-google-api-client
- python-google-auth
- python-oauth2client
- python-httplib2
pacman:
- python-google-api-python-client
- python-google-auth
- python-oauth2client
- python-httplib2
pip:
- google-api-python-client
- google-auth
- oauth2client
- httplib2
package: platypush.plugins.google.calendar
type: plugin

View file

@ -1,35 +1,93 @@
import argparse
import httplib2
import os
import re
import sys
import textwrap as tw
from typing import List, Optional
import httplib2
from oauth2client import client
from oauth2client import tools
from oauth2client.file import Storage
from platypush.config import Config
def get_credentials_filename(*scopes):
from platypush.config import Config
credentials_dir = os.path.join(Config.get_workdir(), "credentials", "google")
default_secrets_file = os.path.join(credentials_dir, "client_secret.json")
"""Default path for the Google API client secrets file"""
scope_name = '-'.join([scope.split('/')[-1] for scope in scopes])
credentials_dir = os.path.join(
Config.get('workdir'), 'credentials', 'google')
def _parse_scopes(*scopes: str) -> List[str]:
return sorted(
{
t.split("/")[-1].strip()
for scope in scopes
for t in re.split(r"[\s,]", scope)
if t
}
)
def get_credentials_filename(*scopes: str):
parsed_scopes = _parse_scopes(*scopes)
scope_name = "-".join([scope.split("/")[-1] for scope in parsed_scopes])
os.makedirs(credentials_dir, exist_ok=True)
return os.path.join(credentials_dir, scope_name + '.json')
matching_scope_file = next(
iter(
os.path.join(credentials_dir, scopes_file)
for scopes_file in {
os.path.basename(file)
for file in os.listdir(credentials_dir)
if file.endswith(".json")
}
if not set(parsed_scopes).difference(
set(scopes_file.split(".json")[0].split("-"))
)
),
None,
)
if matching_scope_file:
return matching_scope_file
return os.path.join(credentials_dir, scope_name + ".json")
def get_credentials(scope):
credentials_file = get_credentials_filename(*sorted(scope.split(' ')))
if not os.path.exists(credentials_file):
raise RuntimeError(('Credentials file {} not found. Generate it through:\n' +
'\tpython -m platypush.plugins.google.credentials "{}" ' +
'<path to client_secret.json>\n' +
'\t\t[--auth_host_name AUTH_HOST_NAME]\n' +
'\t\t[--noauth_local_webserver]\n' +
'\t\t[--auth_host_port [AUTH_HOST_PORT [AUTH_HOST_PORT ...]]]\n' +
'\t\t[--logging_level [DEBUG,INFO,WARNING,ERROR,CRITICAL]]\n').
format(credentials_file, scope))
def get_credentials(scope: str, secrets_file: Optional[str] = None):
scopes = _parse_scopes(scope)
credentials_file = get_credentials_filename(*scopes)
# If we don't have a credentials file for the required set of scopes, but we have a secrets file,
# then try and generate the credentials file from the stored secrets.
if (
not os.path.isfile(credentials_file)
and secrets_file
and os.path.isfile(secrets_file)
):
# If DISPLAY or BROWSER are set, then we can open the authentication URL in the browser.
# Otherwise, we'll have to use the --noauth_local_webserver flag and copy/paste the URL
args = (
["--noauth_local_webserver"]
if not (os.getenv("DISPLAY") or os.getenv("BROWSER"))
else []
)
generate_credentials(secrets_file, scope, *args)
assert os.path.isfile(credentials_file), tw.dedent(
f"""
Credentials file {credentials_file} not found. Generate it through:
python -m platypush.plugins.google.credentials "{','.join(scopes)}" /path/to/client_secret.json
[--auth_host_name AUTH_HOST_NAME]
[--noauth_local_webserver]
[--auth_host_port [AUTH_HOST_PORT [AUTH_HOST_PORT ...]]]
[--logging_level [DEBUG,INFO,WARNING,ERROR,CRITICAL]]
Specify --noauth_local_webserver if you're running this script on a headless machine.
You will then get an authentication URL on the logs.
Otherwise, the URL will be opened in the available browser.
"""
)
store = Storage(credentials_file)
credentials = store.get()
@ -40,16 +98,20 @@ def get_credentials(scope):
return credentials
def generate_credentials(client_secret_path, scope):
credentials_file = get_credentials_filename(*sorted(scope.split(' ')))
def generate_credentials(client_secret_path: str, scope: str, *args: str):
scopes = _parse_scopes(scope)
credentials_file = get_credentials_filename(*scopes)
store = Storage(credentials_file)
scope = ' '.join(
f'https://www.googleapis.com/auth/{scope}' for scope in _parse_scopes(scope)
)
flow = client.flow_from_clientsecrets(client_secret_path, scope)
flow.user_agent = 'Platypush'
flow.access_type = 'offline'
flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args()
flow.user_agent = "Platypush"
flow.access_type = "offline" # type: ignore
flags = argparse.ArgumentParser(parents=[tools.argparser]).parse_args(args) # type: ignore
tools.run_flow(flow, store, flags)
print('Storing credentials to ' + credentials_file)
print("Storing credentials to", credentials_file)
def main():
@ -57,23 +119,29 @@ def main():
Generates a Google API credentials file given client secret JSON and scopes.
Usage::
python -m platypush.plugins.google.credentials [client_secret.json location] [comma-separated list of scopes]
python -m platypush.plugins.google.credentials \
[spaces/comma-separated list of scopes] \
[client_secret.json location]
"""
scope = sys.argv.pop(1) if len(sys.argv) > 1 \
else input('Space separated list of OAuth scopes: ')
args = sys.argv[1:]
scope = (
args.pop(0) if args else input("Space/comma separated list of OAuth scopes: ")
).strip()
client_secret_path = os.path.expanduser(
sys.argv.pop(1) if len(sys.argv) > 1
else input('Google credentials JSON file location: '))
if args:
client_secret_path = args.pop(0)
elif os.path.isfile(default_secrets_file):
client_secret_path = default_secrets_file
else:
client_secret_path = input("Google credentials JSON file location: ")
# Uncomment to force headless (no browser spawned) authentication
# sys.argv.append('--noauth_local_webserver')
generate_credentials(client_secret_path, scope)
client_secret_path = os.path.abspath(os.path.expanduser(client_secret_path)).strip()
generate_credentials(client_secret_path, scope, *args)
if __name__ == '__main__':
if __name__ == "__main__":
main()
# vim:sw=4:ts=4:et:

View file

@ -8,8 +8,36 @@ from platypush.message.response.google.drive import GoogleDriveFile
class GoogleDrivePlugin(GooglePlugin):
"""
r"""
Google Drive plugin.
1. Create your Google application, if you don't have one already, on
the `developers console <https://console.developers.google.com>`_.
2. You may have to explicitly enable your user to use the app if the app
is created in test mode. Go to "OAuth consent screen" and add your user's
email address to the list of authorized users.
3. Select the scopes that you want to enable for your application, depending
on the integrations that you want to use.
See https://developers.google.com/identity/protocols/oauth2/scopes
for a list of the available scopes.
4. Click on "Credentials", then "Create credentials" -> "OAuth client ID".
5 Select "Desktop app", enter whichever name you like, and click "Create".
6. Click on the "Download JSON" icon next to your newly created client ID.
7. Generate a credentials file for the required scope:
.. code-block:: bash
mkdir -p <WORKDIR>/credentials/google
python -m platypush.plugins.google.credentials \
'drive,drive.appfolder,drive.photos.readonly' \
<WORKDIR>/credentials/google/client_secret.json
"""
scopes = [
@ -21,7 +49,7 @@ class GoogleDrivePlugin(GooglePlugin):
def __init__(self, *args, **kwargs):
super().__init__(scopes=self.scopes, *args, **kwargs)
def get_service(self, **kwargs):
def get_service(self, **_):
return super().get_service(service='drive', version='v3')
# noinspection PyShadowingBuiltins
@ -85,7 +113,7 @@ class GoogleDrivePlugin(GooglePlugin):
else:
filter += ' '
filter += "'{}' in parents".format(folder_id)
filter += f"'{folder_id}' in parents"
while True:
results = (
@ -216,7 +244,7 @@ class GoogleDrivePlugin(GooglePlugin):
while not done:
status, done = downloader.next_chunk()
self.logger.info('Download progress: {}%'.format(status.progress()))
self.logger.info('Download progress: %s%%', status.progress())
with open(path, 'wb') as f:
f.write(fh.getbuffer().tobytes())
@ -269,8 +297,8 @@ class GoogleDrivePlugin(GooglePlugin):
add_parents: Optional[List[str]] = None,
remove_parents: Optional[List[str]] = None,
mime_type: Optional[str] = None,
starred: bool = None,
trashed: bool = None,
starred: Optional[bool] = None,
trashed: Optional[bool] = None,
) -> GoogleDriveFile:
"""
Update the metadata or the content of a file.

View file

@ -5,20 +5,25 @@ manifest:
- py3-google-api-python-client
- py3-google-auth
- py3-oauth2client
- py3-httplib2
apt:
- python3-google-auth
- python3-oauth2client
- python3-httplib2
dnf:
- python-google-api-client
- python-google-auth
- python-oauth2client
- python-httplib2
pacman:
- python-google-api-python-client
- python-google-auth
- python-oauth2client
- python-httplib2
pip:
- google-api-python-client
- google-auth
- oauth2client
- httplib2
package: platypush.plugins.google.drive
type: plugin

View file

@ -3,8 +3,45 @@ from platypush.plugins.google import GooglePlugin
class GoogleFitPlugin(GooglePlugin):
"""
r"""
Google Fit plugin.
In order to use this plugin:
1. Create your Google application, if you don't have one already, on
the `developers console <https://console.developers.google.com>`_.
2. You may have to explicitly enable your user to use the app if the app
is created in test mode. Go to "OAuth consent screen" and add your user's
email address to the list of authorized users.
3. Select the scopes that you want to enable for your application, depending
on the integrations that you want to use.
See https://developers.google.com/identity/protocols/oauth2/scopes
for a list of the available scopes.
4. Click on "Credentials", then "Create credentials" -> "OAuth client ID".
5 Select "Desktop app", enter whichever name you like, and click "Create".
6. Click on the "Download JSON" icon next to your newly created client ID.
7. Generate a credentials file for the required scope:
.. code-block:: bash
$ mkdir -p <WORKDIR>/credentials/google
$ roles="
fitness.activity.read,
fitness.body.read,
fitness.body_temperature.read,
fitness.heart_rate.read,
fitness.sleep.read,
fitness.location.read
"
$ python -m platypush.plugins.google.credentials "$roles" \
<WORKDIR>/credentials/google/client_secret.json
"""
scopes = [

View file

@ -5,20 +5,25 @@ manifest:
- py3-google-api-python-client
- py3-google-auth
- py3-oauth2client
- py3-httplib2
apt:
- python3-google-auth
- python3-oauth2client
- python3-httplib2
dnf:
- python-google-api-client
- python-google-auth
- python-oauth2client
- python-httplib2
pacman:
- python-google-api-python-client
- python-google-auth
- python-oauth2client
- python-httplib2
pip:
- google-api-python-client
- google-auth
- oauth2client
- httplib2
package: platypush.plugins.google.fit
type: plugin

View file

@ -15,8 +15,38 @@ from platypush.plugins.google import GooglePlugin
class GoogleMailPlugin(GooglePlugin):
"""
GMail plugin. It allows you to programmatically compose and (TODO) get emails
r"""
GMail plugin. It allows you to programmatically compose and (TODO) get emails.
To use this plugin:
1. Create your Google application, if you don't have one already, on
the `developers console <https://console.developers.google.com>`_.
2. You may have to explicitly enable your user to use the app if the app
is created in test mode. Go to "OAuth consent screen" and add your user's
email address to the list of authorized users.
3. Select the scopes that you want to enable for your application, depending
on the integrations that you want to use.
See https://developers.google.com/identity/protocols/oauth2/scopes
for a list of the available scopes.
4. Click on "Credentials", then "Create credentials" -> "OAuth client ID".
5 Select "Desktop app", enter whichever name you like, and click "Create".
6. Click on the "Download JSON" icon next to your newly created client ID.
7. Generate a credentials file for the required scope:
.. code-block:: bash
mkdir -p <WORKDIR>/credentials/google
python -m platypush.plugins.google.credentials \
'gmail.modify' \
<WORKDIR>/credentials/google/client_secret.json
"""
scopes = ['https://www.googleapis.com/auth/gmail.modify']
@ -64,8 +94,7 @@ class GoogleMailPlugin(GooglePlugin):
content = fp.read()
if main_type == 'text':
# noinspection PyUnresolvedReferences
msg = mimetypes.MIMEText(content, _subtype=sub_type)
msg = MIMEText(str(content), _subtype=sub_type)
elif main_type == 'image':
msg = MIMEImage(content, _subtype=sub_type)
elif main_type == 'audio':

View file

@ -5,20 +5,25 @@ manifest:
- py3-google-api-python-client
- py3-google-auth
- py3-oauth2client
- py3-httplib2
apt:
- python3-google-auth
- python3-oauth2client
- python3-httplib2
dnf:
- python-google-api-client
- python-google-auth
- python-oauth2client
- python-httplib2
pacman:
- python-google-api-python-client
- python-google-auth
- python-oauth2client
- python-httplib2
pip:
- google-api-python-client
- google-auth
- oauth2client
- httplib2
package: platypush.plugins.google.mail
type: plugin

View file

@ -14,18 +14,24 @@ datetime_types = Union[str, int, float, datetime]
class GoogleMapsPlugin(GooglePlugin):
"""
Plugins that provides utilities to interact with Google Maps API services.
It requires you to create a Google application - you can create one at the
`developers console <https://console.developers.google.com>`_.
After that, you'll need to create a new API key from the _Credentials_ tab.
This integration doesn't require any additional scopes.
"""
scopes = []
def __init__(self, api_key, *args, **kwargs):
def __init__(self, api_key: str, **kwargs):
"""
:param api_key: Server-side API key to be used for the requests, get one at
https://console.developers.google.com
:type api_key: str
"""
super().__init__(scopes=self.scopes, *args, **kwargs)
super().__init__(scopes=self.scopes, **kwargs)
self.api_key = api_key
@action
@ -43,7 +49,7 @@ class GoogleMapsPlugin(GooglePlugin):
response = requests.get(
'https://maps.googleapis.com/maps/api/geocode/json',
params={
'latlng': '{},{}'.format(latitude, longitude),
'latlng': f'{latitude},{longitude}',
'key': self.api_key,
},
).json()
@ -59,9 +65,10 @@ class GoogleMapsPlugin(GooglePlugin):
if 'results' in response and response['results']:
result = response['results'][0]
self.logger.info(
'Google Maps geocode response for latlng ({},{}): {}'.format(
latitude, longitude, result
)
'Google Maps geocode response for latlng (%f,%f): %s',
latitude,
longitude,
result,
)
address['address'] = result['formatted_address'].split(',')[0]
@ -91,7 +98,7 @@ class GoogleMapsPlugin(GooglePlugin):
response = requests.get(
'https://maps.googleapis.com/maps/api/elevation/json',
params={
'locations': '{},{}'.format(latitude, longitude),
'locations': f'{latitude},{longitude}',
'key': self.api_key,
},
).json()
@ -192,16 +199,21 @@ class GoogleMapsPlugin(GooglePlugin):
"""
rs = requests.get(
'https://maps.googleapis.com/maps/api/distancematrix/json',
timeout=20,
params={
'origins': '|'.join(origins),
'destinations': '|'.join(destinations),
'units': units,
**(
{'departure_time': to_datetime(departure_time)}
{'departure_time': to_datetime(departure_time).isoformat()}
if departure_time
else {}
),
**({'arrival_time': to_datetime(arrival_time)} if arrival_time else {}),
**(
{'arrival_time': to_datetime(arrival_time).isoformat()}
if arrival_time
else {}
),
**({'avoid': '|'.join(avoid)} if avoid else {}),
**({'language': language} if language else {}),
**({'mode': mode} if mode else {}),

View file

@ -5,20 +5,25 @@ manifest:
- py3-google-api-python-client
- py3-google-auth
- py3-oauth2client
- py3-httplib2
apt:
- python3-google-auth
- python3-oauth2client
- python3-httplib2
dnf:
- python-google-api-client
- python-google-auth
- python-oauth2client
- python-httplib2
pacman:
- python-google-api-python-client
- python-google-auth
- python-oauth2client
- python-httplib2
pip:
- google-api-python-client
- google-auth
- oauth2client
- httplib2
package: platypush.plugins.google.maps
type: plugin

View file

@ -9,15 +9,18 @@ class GooglePubsubPlugin(Plugin):
Send messages over a Google pub/sub instance.
You'll need a Google Cloud active project and a set of credentials to use this plugin:
1. Create a project on the `Google Cloud console <https://console.cloud.google.com/projectcreate>`_ if
you don't have one already.
1. Create a project on the `Google Cloud console
<https://console.cloud.google.com/projectcreate>`_ if you don't have
one already.
2. In the `Google Cloud API console <https://console.cloud.google.com/apis/credentials/serviceaccountkey>`_
create a new service account key. Select "New Service Account", choose the role "Pub/Sub Editor" and leave
the key type as JSON.
2. In the `Google Cloud API console
<https://console.cloud.google.com/apis/credentials/serviceaccountkey>`_
create a new service account key. Select "New Service Account", choose
the role "Pub/Sub Editor" and leave the key type as JSON.
3. Download the JSON service credentials file. By default platypush will look for the credentials file under
~/.credentials/platypush/google/pubsub.json.
3. Download the JSON service credentials file. By default Platypush
will look for the credentials file under
``~/.credentials/platypush/google/pubsub.json``.
"""
@ -29,8 +32,8 @@ class GooglePubsubPlugin(Plugin):
def __init__(self, credentials_file: str = default_credentials_file, **kwargs):
"""
:param credentials_file: Path to the JSON credentials file for Google pub/sub (default:
~/.credentials/platypush/google/pubsub.json)
:param credentials_file: Path to the JSON credentials file for Google
pub/sub (default: ``~/.credentials/platypush/google/pubsub.json``)
"""
super().__init__(**kwargs)
self.credentials_file = credentials_file
@ -52,10 +55,13 @@ class GooglePubsubPlugin(Plugin):
"""
Sends a message to a topic
:param topic: Topic/channel where the message will be delivered. You can either specify the full topic name in
the format ``projects/<project_id>/topics/<topic_name>``, where ``<project_id>`` must be the ID of your
Google Pub/Sub project, or just ``<topic_name>`` - in such case it's implied that you refer to the
``topic_name`` under the ``project_id`` of your service credentials.
:param topic: Topic/channel where the message will be delivered. You
can either specify the full topic name in the format
``projects/<project_id>/topics/<topic_name>``, where
``<project_id>`` must be the ID of your Google Pub/Sub project, or
just ``<topic_name>`` - in such case it's implied that you refer to
the ``topic_name`` under the ``project_id`` of your service
credentials.
:param msg: Message to be sent. It can be a list, a dict, or a Message object
:param kwargs: Extra arguments to be passed to .publish()
"""
@ -65,8 +71,8 @@ class GooglePubsubPlugin(Plugin):
credentials = self.get_credentials(self.publisher_audience)
publisher = pubsub_v1.PublisherClient(credentials=credentials)
if not topic.startswith('projects/{}/topics/'.format(self.project_id)):
topic = 'projects/{}/topics/{}'.format(self.project_id, topic)
if not topic.startswith(f'projects/{self.project_id}/topics/'):
topic = f'projects/{self.project_id}/topics/{topic}'
try:
publisher.create_topic(topic)

View file

@ -5,21 +5,26 @@ manifest:
- py3-google-api-python-client
- py3-google-auth
- py3-oauth2client
- py3-httplib2
apt:
- python3-google-auth
- python3-oauth2client
- python3-httplib2
dnf:
- python-google-api-client
- python-google-auth
- python-oauth2client
- python-httplib2
pacman:
- python-google-api-python-client
- python-google-auth
- python-oauth2client
- python-httplib2
pip:
- google-api-python-client
- google-auth
- oauth2client
- google-cloud-pubsub
- httplib2
package: platypush.plugins.google.pubsub
type: plugin

View file

@ -1,7 +1,6 @@
import os
from typing import Optional, List
# noinspection PyPackageRequirements
from google.cloud import translate_v2 as translate
from platypush.message.response.translate import TranslateResponse
@ -13,16 +12,19 @@ class GoogleTranslatePlugin(Plugin):
Plugin to interact with the Google Translate API.
You'll need a Google Cloud active project and a set of credentials to use this plugin:
1. Create a project on the `Google Cloud console <https://console.cloud.google.com/projectcreate>`_ if
you don't have one already.
1. Create a project on the `Google Cloud console
<https://console.cloud.google.com/projectcreate>`_ if you don't
have one already.
2. In the menu navigate to the *Artificial Intelligence* section and select *Translations* and enable the API.
2. In the menu navigate to the *Artificial Intelligence* section and
select *Translations* and enable the API.
3. From the menu select *APIs & Services* and create a service account. You can leave role and permissions
empty.
3. From the menu select *APIs & Services* and create a service account.
You can leave role and permissions empty.
4. Create a new private JSON key for the service account and download it. By default platypush will look for the
credentials file under ``~/.credentials/platypush/google/translate.json``.
4. Create a new private JSON key for the service account and download
it. By default platypush will look for the credentials file under
``~/.credentials/platypush/google/translate.json``.
"""
@ -39,11 +41,14 @@ class GoogleTranslatePlugin(Plugin):
):
"""
:param target_language: Default target language (default: 'en').
:param credentials_file: Google service account JSON credentials file. If none is specified then the plugin will
search for the credentials file in the following order:
:param credentials_file: Google service account JSON credentials file.
If none is specified then the plugin will search for the credentials
file in the following order:
1. ``~/.credentials/platypush/google/translate.json``
2. Context from the ``GOOGLE_APPLICATION_CREDENTIALS`` environment variable.
2. Context from the ``GOOGLE_APPLICATION_CREDENTIALS``
environment variable.
"""
super().__init__(**kwargs)
self.target_language = target_language
@ -64,7 +69,7 @@ class GoogleTranslatePlugin(Plugin):
for i in range(min(pos, len(text) - 1), -1, -1):
if text[i] in [' ', '\t', ',', '.', ')', '>']:
return i
elif text[i] in ['(', '<']:
if text[i] in ['(', '<']:
return i - 1 if i > 0 else 0
return 0
@ -86,7 +91,6 @@ class GoogleTranslatePlugin(Plugin):
return parts
# noinspection PyShadowingBuiltins
@action
def translate(
self,
@ -121,7 +125,6 @@ class GoogleTranslatePlugin(Plugin):
if not result:
result = response
else:
# noinspection PyTypeChecker
result['translatedText'] += ' ' + response['translatedText']
return TranslateResponse(

View file

@ -5,21 +5,26 @@ manifest:
- py3-google-api-python-client
- py3-google-auth
- py3-oauth2client
- py3-httplib2
apt:
- python3-google-auth
- python3-oauth2client
- python3-httplib2
dnf:
- python-google-api-client
- python-google-auth
- python-oauth2client
- python-httplib2
pacman:
- python-google-api-python-client
- python-google-auth
- python-oauth2client
- python-httplib2
pip:
- google-api-python-client
- google-auth
- oauth2client
- google-cloud-translate
- httplib2
package: platypush.plugins.google.translate
type: plugin

View file

@ -1,10 +1,41 @@
from typing import Collection, Optional, Union
from platypush.plugins import action
from platypush.plugins.google import GooglePlugin
class GoogleYoutubePlugin(GooglePlugin):
"""
r"""
YouTube plugin.
Requirements:
1. Create your Google application, if you don't have one already, on
the `developers console <https://console.developers.google.com>`_.
2. You may have to explicitly enable your user to use the app if the app
is created in test mode. Go to "OAuth consent screen" and add your user's
email address to the list of authorized users.
3. Select the scopes that you want to enable for your application, depending
on the integrations that you want to use.
See https://developers.google.com/identity/protocols/oauth2/scopes
for a list of the available scopes.
4. Click on "Credentials", then "Create credentials" -> "OAuth client ID".
5 Select "Desktop app", enter whichever name you like, and click "Create".
6. Click on the "Download JSON" icon next to your newly created client ID.
7. Generate a credentials file for the required scope:
.. code-block:: bash
mkdir -p <WORKDIR>/credentials/google
python -m platypush.plugins.google.credentials \
'youtube.readonly' \
<WORKDIR>/credentials/google/client_secret.json
"""
scopes = ['https://www.googleapis.com/auth/youtube.readonly']
@ -19,28 +50,28 @@ class GoogleYoutubePlugin(GooglePlugin):
super().__init__(scopes=self.scopes, *args, **kwargs)
@action
def search(self, parts=None, query='', types=None, max_results=25, **kwargs):
def search(
self,
parts: Optional[Union[str, Collection[str]]] = None,
query: str = '',
types: Optional[Union[str, Collection[str]]] = None,
max_results: int = 25,
**kwargs
):
"""
Search for YouTube content.
:param parts: List of parts to get (default: snippet).
See the `Getting started - Part <https://developers.google.com/youtube/v3/getting-started#part>`_.
:type parts: list[str] or str
See the `Getting started - Part
<https://developers.google.com/youtube/v3/getting-started#part>`_.
:param query: Query string (default: empty string)
:type query: str
:param types: List of types to retrieve (default: video).
See the `Getting started - Resources <https://developers.google.com/youtube/v3/getting-started#resources>`_.
:type types: list[str] or str
See the `Getting started - Resources
<https://developers.google.com/youtube/v3/getting-started#resources>`_.
:param max_results: Maximum number of items that will be returned (default: 25).
:type max_results: int
:param kwargs: Any extra arguments that will be transparently passed to the YouTube API.
See the `Getting started - parameters
<https://developers.google.com/youtube/v3/docs/search/list#parameters>`_.
:return: A list of YouTube resources.
See the `Getting started - Resource
<https://developers.google.com/youtube/v3/docs/search#resource>`_.

View file

@ -5,20 +5,25 @@ manifest:
- py3-google-api-python-client
- py3-google-auth
- py3-oauth2client
- py3-httplib2
apt:
- python3-google-auth
- python3-oauth2client
- python3-httplib2
dnf:
- python-google-api-client
- python-google-auth
- python-oauth2client
- python-httplib2
pacman:
- python-google-api-python-client
- python-google-auth
- python-oauth2client
- python-httplib2
pip:
- google-api-python-client
- google-auth
- oauth2client
- httplib2
package: platypush.plugins.google.youtube
type: plugin

View file

@ -4,6 +4,7 @@ import select
import subprocess
import threading
import time
from typing import Any, Collection, Dict, List, Optional
from platypush.context import get_bus
from platypush.message.response import Response
@ -39,10 +40,9 @@ class MediaMplayerPlugin(MediaPlugin):
def __init__(
self,
mplayer_bin=None,
mplayer_timeout=_mplayer_default_communicate_timeout,
args=None,
*argv,
mplayer_bin: Optional[str] = None,
mplayer_timeout: float = _mplayer_default_communicate_timeout,
args: Optional[Collection[str]] = None,
**kwargs,
):
"""
@ -52,21 +52,13 @@ class MediaMplayerPlugin(MediaPlugin):
:param mplayer_bin: Path to the MPlayer executable (default: search for
the first occurrence in your system PATH environment variable)
:type mplayer_bin: str
:param mplayer_timeout: Timeout in seconds to wait for more data
from MPlayer before considering a response ready (default: 0.5 seconds)
:type mplayer_timeout: float
:param subtitles: Path to the subtitles file
:type subtitles: str
:param args: Default arguments that will be passed to the MPlayer
executable
:type args: list
"""
super().__init__(*argv, **kwargs)
super().__init__(**kwargs)
self.args = args or []
self._init_mplayer_bin(mplayer_bin=mplayer_bin)
@ -106,17 +98,15 @@ class MediaMplayerPlugin(MediaPlugin):
try:
self._player.terminate()
except Exception as e:
self.logger.debug(
'Failed to quit mplayer before _exec: {}'.format(str(e))
)
self.logger.debug('Failed to quit mplayer before _exec: %s', e)
mplayer_args = mplayer_args or []
m_args = mplayer_args or []
args = [self.mplayer_bin] + self._mplayer_bin_default_args
for arg in self.args + mplayer_args:
for arg in (*self.args, *m_args):
if arg not in args:
args.append(arg)
popen_args = {
popen_args: Dict[str, Any] = {
'stdin': subprocess.PIPE,
'stdout': subprocess.PIPE,
}
@ -140,10 +130,13 @@ class MediaMplayerPlugin(MediaPlugin):
def args_pprint(txt):
lc = txt.lower()
if lc[0] == '[':
return '%s=None' % lc[1:-1]
return f'{lc[1:-1]}=None'
return lc
while True:
if not mplayer.stdout:
break
line = mplayer.stdout.readline()
if not line:
break
@ -153,7 +146,7 @@ class MediaMplayerPlugin(MediaPlugin):
args = line.split()
cmd_name = args.pop(0)
arguments = ', '.join([args_pprint(a) for a in args])
self._actions[cmd_name] = '{}({})'.format(cmd_name, arguments)
self._actions[cmd_name] = f'{cmd_name}({arguments})'
def _exec(
self, cmd, *args, mplayer_args=None, prefix=None, wait_for_response=False
@ -161,22 +154,27 @@ class MediaMplayerPlugin(MediaPlugin):
cmd_name = cmd
response = None
if cmd_name == 'loadfile' or cmd_name == 'loadlist':
if cmd_name in {'loadfile', 'loadlist'}:
self._init_mplayer(mplayer_args)
else:
if not self._player:
self.logger.warning('MPlayer is not running')
cmd = '{}{}{}{}\n'.format(
prefix + ' ' if prefix else '',
cmd_name,
' ' if args else '',
' '.join(repr(a) for a in args),
cmd = (
f'{prefix + " " if prefix else ""}'
+ cmd_name
+ (" " if args else "")
+ " ".join(repr(a) for a in args)
+ '\n'
).encode()
if not self._player:
self.logger.warning('Cannot send command %s: player unavailable', cmd)
return
if not self._player.stdin:
self.logger.warning(
'Cannot send command {}: player unavailable'.format(cmd)
'Could not communicate with the mplayer process: the stdin is closed'
)
return
@ -199,6 +197,12 @@ class MediaMplayerPlugin(MediaPlugin):
if not wait_for_response:
return
if not (self._player and self._player.stdout):
self.logger.warning(
'Could not communicate with the mplayer process: the stdout is closed'
)
return
poll = select.poll()
poll.register(self._player.stdout, select.POLLIN)
last_read_time = time.time()
@ -209,11 +213,16 @@ class MediaMplayerPlugin(MediaPlugin):
if not self._player:
break
line = self._player.stdout.readline().decode()
buf = self._player.stdout.readline()
line = buf.decode() if isinstance(buf, bytes) else buf
last_read_time = time.time()
if line.startswith('ANS_'):
m = re.match('^([^=]+)=(.*)$', line[4:])
if not m:
self.logger.warning('Unexpected response: %s', line)
break
k, v = m.group(1), m.group(2)
v = v.strip()
if v == 'yes':
@ -222,7 +231,8 @@ class MediaMplayerPlugin(MediaPlugin):
v = False
try:
v = eval(v)
if isinstance(v, str):
v = eval(v) # pylint: disable=eval-used
except Exception:
pass
@ -272,25 +282,26 @@ class MediaMplayerPlugin(MediaPlugin):
bus.post(evt_type(player='local', plugin='media.mplayer', **evt))
@action
def play(self, resource, subtitles=None, mplayer_args=None):
def play(
self,
resource: str,
subtitles: Optional[str] = None,
mplayer_args: Optional[List[str]] = None,
):
"""
Play a resource.
:param resource: Resource to play - can be a local file or a remote URL
:type resource: str
:param subtitles: Path to optional subtitle file
:type subtitles: str
:param mplayer_args: Extra runtime arguments that will be passed to the
MPlayer executable
:type mplayer_args: list[str]
"""
self._post_event(MediaPlayRequestEvent, resource=resource)
if subtitles:
mplayer_args = mplayer_args or []
mplayer_args += ['-sub', self.get_subtitles_file(subtitles)]
subs = self.get_subtitles_file(subtitles)
if subs:
mplayer_args = list(mplayer_args or []) + ['-sub', subs]
resource = self._get_resource(resource)
if resource.startswith('file://'):
@ -305,67 +316,78 @@ class MediaMplayerPlugin(MediaPlugin):
return self.status()
@action
def pause(self):
def pause(self, *_, **__):
"""Toggle the paused state"""
self._exec('pause')
self._post_event(MediaPauseEvent)
return self.status()
@action
def stop(self):
def stop(self, *_, **__):
"""Stop the playback"""
# return self._exec('stop')
self.quit()
return self.status()
@action
def quit(self):
def quit(self, *_, **__):
"""Quit the player"""
self._exec('quit')
self._post_event(MediaStopEvent)
return self.status()
@action
def voldown(self, step=10.0):
def voldown(self, *_, step=10.0, **__):
"""Volume down by (default: 10)%"""
self._exec('volume', -step * 10)
return self.status()
@action
def volup(self, step=10.0):
def volup(self, *_, step=10.0, **__):
"""Volume up by (default: 10)%"""
self._exec('volume', step * 10)
return self.status()
@action
def back(self, offset=30.0):
def back(self, *_, offset=30.0, **__):
"""Back by (default: 30) seconds"""
self.step_property('time_pos', -offset)
return self.status()
@action
def forward(self, offset=30.0):
def forward(self, *_, offset=30.0, **__):
"""Forward by (default: 30) seconds"""
self.step_property('time_pos', offset)
return self.status()
@action
def toggle_subtitles(self):
def toggle_subtitles(self, *_, **__):
"""Toggle the subtitles visibility"""
subs = self.get_property('sub_visibility').output.get('sub_visibility')
response: dict = (
self.get_property('sub_visibility').output or {} # type: ignore
)
subs = response.get('sub_visibility')
self._exec('sub_visibility', int(not subs))
return self.status()
@action
def add_subtitles(self, filename, **__):
"""Sets media subtitles from filename"""
def add_subtitles(self, filename: str, **__):
"""
Sets media subtitles from filename
:param filename: Subtitles file.
"""
self._exec('sub_visibility', 1)
self._exec('sub_load', filename)
return self.status()
@action
def remove_subtitles(self, index=None):
"""Removes the subtitle specified by the index (default: all)"""
def remove_subtitles(self, *_, index: Optional[int] = None, **__):
"""
Removes the subtitle specified by the index (default: all)
:param index: (1-based) index of the subtitles track to remove.
"""
if index is None:
self._exec('sub_remove')
else:
@ -374,14 +396,15 @@ class MediaMplayerPlugin(MediaPlugin):
return self.status()
@action
def is_playing(self):
def is_playing(self, *_, **__):
"""
:returns: True if it's playing, False otherwise
"""
return self.get_property('pause').output.get('pause') is False
response: dict = self.get_property('pause').output or {} # type: ignore
return response.get('pause') is False
@action
def load(self, resource, mplayer_args=None, **kwargs):
def load(self, resource, *_, mplayer_args: Optional[Collection[str]] = None, **__):
"""
Load a resource/video in the player.
"""
@ -390,13 +413,13 @@ class MediaMplayerPlugin(MediaPlugin):
return self.play(resource, mplayer_args=mplayer_args)
@action
def mute(self):
def mute(self, *_, **__):
"""Toggle mute state"""
self._exec('mute')
return self.status()
@action
def seek(self, position):
def seek(self, position: float, *_, **__):
"""
Seek backward/forward by the specified number of seconds
@ -407,7 +430,7 @@ class MediaMplayerPlugin(MediaPlugin):
return self.status()
@action
def set_position(self, position):
def set_position(self, position: float, *_, **__):
"""
Seek backward/forward to the specified absolute position
@ -418,7 +441,7 @@ class MediaMplayerPlugin(MediaPlugin):
return self.status()
@action
def set_volume(self, volume):
def set_volume(self, volume: float, *_, **__):
"""
Set the volume
@ -463,7 +486,7 @@ class MediaMplayerPlugin(MediaPlugin):
with self._status_lock:
for prop, player_prop in props.items():
value = self.get_property(player_prop).output
if value is not None:
if isinstance(value, dict):
status[prop] = value.get(player_prop)
status['seekable'] = bool(status['duration'])
@ -480,7 +503,11 @@ class MediaMplayerPlugin(MediaPlugin):
return status
@action
def get_property(self, property, args=None):
def get_property(
self,
property: str, # pylint: disable=redefined-builtin
args: Optional[Collection[str]] = None,
):
"""
Get a player property (e.g. pause, fullscreen etc.). See
https://www.mplayerhq.hu/DOCS/tech/slave.txt for a full list of the
@ -503,14 +530,23 @@ class MediaMplayerPlugin(MediaPlugin):
for k, v in result.items():
if k == 'ERROR' and v not in response.errors:
response.errors.append('{}{}: {}'.format(property, args, v))
if not isinstance(response.errors, list):
response.errors = []
response.errors.append(f'{property}{args}: {v}')
else:
if not isinstance(response.output, dict):
response.output = {}
response.output[k] = v
return response
@action
def set_property(self, property, value, args=None):
def set_property(
self,
property: str, # pylint: disable=redefined-builtin
value: Any,
args: Optional[Collection[str]] = None,
):
"""
Set a player property (e.g. pause, fullscreen etc.). See
https://www.mplayerhq.hu/DOCS/tech/slave.txt for a full list of the
@ -534,14 +570,25 @@ class MediaMplayerPlugin(MediaPlugin):
for k, v in result.items():
if k == 'ERROR' and v not in response.errors:
response.errors.append('{} {}{}: {}'.format(property, value, args, v))
if not isinstance(response.errors, list):
response.errors = []
response.errors.append(f'{property} {value}{args}: {v}')
else:
if not isinstance(response.output, dict):
response.output = {}
response.output[k] = v
return response
@action
def step_property(self, property, value, args=None):
def step_property(
self,
property: str, # pylint: disable=redefined-builtin
value: Any,
*_,
args: Optional[Collection[str]] = None,
**__,
):
"""
Step a player property (e.g. volume, time_pos etc.). See
https://www.mplayerhq.hu/DOCS/tech/slave.txt for a full list of the
@ -565,13 +612,18 @@ class MediaMplayerPlugin(MediaPlugin):
for k, v in result.items():
if k == 'ERROR' and v not in response.errors:
response.errors.append('{} {}{}: {}'.format(property, value, args, v))
if not isinstance(response.errors, list):
response.errors = []
response.errors.append(f'{property} {value}{args}: {v}')
else:
if not isinstance(response.output, dict):
response.output = {}
response.output[k] = v
return response
def set_subtitles(self, filename, *args, **kwargs):
def set_subtitles(self, filename: str, *_, **__):
self.logger.debug('set_subtitles called with filename=%s', filename)
raise NotImplementedError

View file

@ -1,5 +1,7 @@
import enum
import threading
from typing import Collection, Optional
import urllib.parse
from platypush.context import get_bus
@ -16,6 +18,10 @@ from platypush.plugins import action
class PlayerEvent(enum.Enum):
"""
Supported player events.
"""
STOP = 'stop'
PLAY = 'play'
PAUSE = 'pause'
@ -26,17 +32,18 @@ class MediaOmxplayerPlugin(MediaPlugin):
Plugin to control video and media playback using OMXPlayer.
"""
def __init__(self, args=None, *argv, timeout: float = 20.0, **kwargs):
def __init__(
self, args: Optional[Collection[str]] = None, timeout: float = 20.0, **kwargs
):
"""
:param args: Arguments that will be passed to the OMXPlayer constructor
(e.g. subtitles, volume, start position, window size etc.) see
https://github.com/popcornmix/omxplayer#synopsis and
https://python-omxplayer-wrapper.readthedocs.io/en/latest/omxplayer/#omxplayer.player.OMXPlayer
:type args: list
:param timeout: How long the plugin should wait for a video to start upon play request (default: 20 seconds).
"""
super().__init__(*argv, **kwargs)
super().__init__(**kwargs)
if args is None:
args = []
@ -48,7 +55,7 @@ class MediaOmxplayerPlugin(MediaPlugin):
self._play_started = threading.Event()
@action
def play(self, resource=None, subtitles=None, *args, **kwargs):
def play(self, *args, resource=None, subtitles=None, **_):
"""
Play or resume playing a resource.
@ -68,9 +75,8 @@ class MediaOmxplayerPlugin(MediaPlugin):
self._player.play()
return self.status()
else:
self._play_started.clear()
self._play_started.clear()
self._post_event(MediaPlayRequestEvent, resource=resource)
if subtitles:
@ -141,7 +147,7 @@ class MediaOmxplayerPlugin(MediaPlugin):
return {'status': 'stop'}
def get_volume(self) -> float:
def get_volume(self) -> Optional[float]:
"""
:return: The player volume in percentage [0, 100].
"""
@ -157,7 +163,9 @@ class MediaOmxplayerPlugin(MediaPlugin):
:type step: float
"""
if self._player:
self.set_volume(max(0, self.get_volume() - step))
vol = self.get_volume()
if vol is not None:
self.set_volume(max(0, vol - step))
return self.status()
@action
@ -169,7 +177,9 @@ class MediaOmxplayerPlugin(MediaPlugin):
:type step: float
"""
if self._player:
self.set_volume(min(100, self.get_volume() + step))
vol = self.get_volume()
if vol is not None:
self.set_volume(min(100, vol + step))
return self.status()
@action
@ -213,23 +223,19 @@ class MediaOmxplayerPlugin(MediaPlugin):
return self.status()
@action
def is_playing(self):
def is_playing(self, *_, **__) -> bool:
"""
:returns: True if it's playing, False otherwise
"""
return self._player.is_playing()
return self._player.is_playing() if self._player else False
@action
def load(self, resource, pause=False, **kwargs):
def load(self, resource: str, *_, pause: bool = False, **__):
"""
Load a resource/video in the player.
:param resource: URL or filename to load
:type resource: str
:param pause: If set, load the video in paused mode (default: False)
:type pause: bool
"""
if self._player:
@ -244,48 +250,45 @@ class MediaOmxplayerPlugin(MediaPlugin):
return self.status()
@action
def mute(self):
def mute(self, *_, **__):
"""Mute the player"""
if self._player:
self._player.mute()
return self.status()
@action
def unmute(self):
def unmute(self, *_, **__):
"""Unmute the player"""
if self._player:
self._player.unmute()
return self.status()
@action
def seek(self, position):
def seek(self, position: float, **__):
"""
Seek to the specified number of seconds from the start.
:param position: Number of seconds from the start
:type position: float
"""
if self._player:
self._player.set_position(position)
return self.status()
@action
def set_position(self, position):
def set_position(self, position: float, **__):
"""
Seek to the specified number of seconds from the start (same as :meth:`.seek`).
:param position: Number of seconds from the start
:type position: float
"""
return self.seek(position)
@action
def set_volume(self, volume):
def set_volume(self, volume: float, *_, **__):
"""
Set the volume
:param volume: Volume value between 0 and 100
:type volume: float
"""
if self._player:
@ -327,7 +330,7 @@ class MediaOmxplayerPlugin(MediaPlugin):
try:
state = self._player.playback_status().lower()
except (OMXPlayerDeadError, DBusException) as e:
self.logger.warning(f'Could not retrieve player status: {e}')
self.logger.warning('Could not retrieve player status: %s', e)
if isinstance(e, OMXPlayerDeadError):
self._player = None
@ -362,9 +365,7 @@ class MediaOmxplayerPlugin(MediaPlugin):
def add_handler(self, event_type, callback):
if event_type not in self._handlers.keys():
raise AttributeError(
'{} is not a valid PlayerEvent type'.format(event_type)
)
raise AttributeError(f'{event_type} is not a valid PlayerEvent type')
self._handlers[event_type].append(callback)
@ -420,13 +421,13 @@ class MediaOmxplayerPlugin(MediaPlugin):
self._player.positionEvent += self.on_seek()
self._player.seekEvent += self.on_seek()
def toggle_subtitles(self, *args, **kwargs):
def toggle_subtitles(self, *_, **__):
raise NotImplementedError
def set_subtitles(self, filename, *args, **kwargs):
def set_subtitles(self, *_, **__):
raise NotImplementedError
def remove_subtitles(self, *args, **kwargs):
def remove_subtitles(self, *_, **__):
raise NotImplementedError

View file

@ -1,7 +1,7 @@
import os
import threading
import urllib.parse
from typing import Optional
from typing import Collection, Optional
from platypush.context import get_bus
from platypush.plugins.media import PlayerState, MediaPlugin
@ -24,23 +24,22 @@ class MediaVlcPlugin(MediaPlugin):
Plugin to control VLC instances.
"""
def __init__(self, args=None, fullscreen=False, volume=100, *argv, **kwargs):
def __init__(
self,
args: Optional[Collection[str]] = None,
fullscreen: bool = False,
volume: int = 100,
**kwargs
):
"""
Create the vlc wrapper.
:param args: List of extra arguments to pass to the VLC executable (e.g.
``['--sub-language=en', '--snapshot-path=/mnt/snapshots']``)
:type args: list[str]
:param fullscreen: Set to True if you want media files to be opened in
fullscreen by default (can be overridden by `.play()`) (default: False)
:type fullscreen: bool
:param volume: Default media volume (default: 100)
:type volume: int
"""
super().__init__(*argv, **kwargs)
super().__init__(**kwargs)
self._args = args or []
self._instance = None
@ -98,6 +97,7 @@ class MediaVlcPlugin(MediaPlugin):
self._monitor_thread = threading.Thread(target=self._player_monitor)
self._monitor_thread.start()
self._instance = vlc.Instance(*self._args)
assert self._instance, 'Could not create a VLC instance'
self._player = self._instance.media_player_new(resource)
for evt in self._watched_event_types():
@ -136,65 +136,67 @@ class MediaVlcPlugin(MediaPlugin):
def callback(event):
from vlc import EventType
self.logger.debug('Received vlc event: {}'.format(event))
if event.type == EventType.MediaPlayerPlaying:
self.logger.debug('Received vlc event: %s', event)
if event.type == EventType.MediaPlayerPlaying: # type: ignore
self._post_event(MediaPlayEvent, resource=self._get_current_resource())
elif event.type == EventType.MediaPlayerPaused:
elif event.type == EventType.MediaPlayerPaused: # type: ignore
self._post_event(MediaPauseEvent)
elif (
event.type == EventType.MediaPlayerStopped
or event.type == EventType.MediaPlayerEndReached
event.type == EventType.MediaPlayerStopped # type: ignore
or event.type == EventType.MediaPlayerEndReached # type: ignore
):
self._on_stop_event.set()
self._post_event(MediaStopEvent)
for cbk in self._on_stop_callbacks:
cbk()
elif (
event.type == EventType.MediaPlayerTitleChanged
or event.type == EventType.MediaPlayerMediaChanged
elif self._player and (
event.type
in (
EventType.MediaPlayerTitleChanged, # type: ignore
EventType.MediaPlayerMediaChanged, # type: ignore
)
):
self._title = self._player.get_title() or self._filename
if event.type == EventType.MediaPlayerMediaChanged:
if event.type == EventType.MediaPlayerMediaChanged: # type: ignore
self._post_event(NewPlayingMediaEvent, resource=self._title)
elif event.type == EventType.MediaPlayerLengthChanged:
elif event.type == EventType.MediaPlayerLengthChanged: # type: ignore
self._post_event(
NewPlayingMediaEvent, resource=self._get_current_resource()
)
elif event.type == EventType.MediaPlayerTimeChanged:
elif self._player and event.type == EventType.MediaPlayerTimeChanged: # type: ignore
pos = float(self._player.get_time() / 1000)
if self._latest_seek is None or abs(pos - self._latest_seek) > 5:
self._post_event(MediaSeekEvent, position=pos)
self._latest_seek = pos
elif event.type == EventType.MediaPlayerAudioVolume:
elif self._player and event.type == EventType.MediaPlayerAudioVolume: # type: ignore
self._post_event(
MediaVolumeChangedEvent, volume=self._player.audio_get_volume()
)
elif event.type == EventType.MediaPlayerMuted:
elif event.type == EventType.MediaPlayerMuted: # type: ignore
self._post_event(MediaMuteChangedEvent, mute=True)
elif event.type == EventType.MediaPlayerUnmuted:
elif event.type == EventType.MediaPlayerUnmuted: # type: ignore
self._post_event(MediaMuteChangedEvent, mute=False)
return callback
@action
def play(self, resource=None, subtitles=None, fullscreen=None, volume=None):
def play(
self,
resource: Optional[str] = None,
subtitles: Optional[str] = None,
fullscreen: Optional[bool] = None,
volume: Optional[int] = None,
):
"""
Play a resource.
:param resource: Resource to play - can be a local file or a remote URL (default: None == toggle play).
:type resource: str
:param resource: Resource to play - can be a local file or a remote URL
(default: None == toggle play).
:param subtitles: Path to optional subtitle file
:type subtitles: str
:param fullscreen: Set to explicitly enable/disable fullscreen (default:
`fullscreen` configured value or False)
:type fullscreen: bool
:param volume: Set to explicitly set the playback volume (default:
`volume` configured value or 100)
:type fullscreen: bool
"""
if not resource:
@ -208,12 +210,14 @@ class MediaVlcPlugin(MediaPlugin):
self._filename = resource
self._init_vlc(resource)
if subtitles:
if subtitles and self._player:
if subtitles.startswith('file://'):
subtitles = subtitles[len('file://') :]
self._player.video_set_subtitle_file(subtitles)
if self._player:
self._player.play()
if self.volume:
self.set_volume(volume=self.volume)
@ -226,71 +230,60 @@ class MediaVlcPlugin(MediaPlugin):
return self.status()
@action
def pause(self):
def pause(self, *_, **__):
"""Toggle the paused state"""
if not self._player:
return None, 'No vlc instance is running'
if not self._player.can_pause():
return None, 'The specified media type cannot be paused'
assert self._player, 'No vlc instance is running'
assert self._player.can_pause(), 'The specified media type cannot be paused'
self._player.pause()
return self.status()
@action
def quit(self):
def quit(self, *_, **__):
"""Quit the player (same as `stop`)"""
with self._stop_lock:
if not self._player:
return None, 'No vlc instance is running'
assert self._player, 'No vlc instance is running'
self._player.stop()
self._on_stop_event.wait(timeout=5)
self._reset_state()
return self.status()
@action
def stop(self):
def stop(self, *_, **__):
"""Stop the application (same as `quit`)"""
return self.quit()
@action
def voldown(self, step=10.0):
def voldown(self, *_, step: float = 10.0, **__):
"""Volume down by (default: 10)%"""
if not self._player:
return None, 'No vlc instance is running'
assert self._player, 'No vlc instance is running'
return self.set_volume(int(max(0, self._player.audio_get_volume() - step)))
@action
def volup(self, step=10.0):
def volup(self, *_, step: float = 10.0, **__):
"""Volume up by (default: 10)%"""
if not self._player:
return None, 'No vlc instance is running'
assert self._player, 'No vlc instance is running'
return self.set_volume(int(min(100, self._player.audio_get_volume() + step)))
@action
def set_volume(self, volume):
def set_volume(self, volume: int):
"""
Set the volume
:param volume: Volume value between 0 and 100
:type volume: float
"""
if not self._player:
return None, 'No vlc instance is running'
assert self._player, 'No vlc instance is running'
volume = max(0, min([100, volume]))
self._player.audio_set_volume(volume)
status = self.status().output
status: dict = self.status().output # type: ignore
status['volume'] = volume
return status
@action
def seek(self, position):
def seek(self, position: float):
"""
Seek backward/forward by the specified number of seconds
:param position: Number of seconds relative to the current cursor
:type position: int
"""
if not self._player:
return None, 'No vlc instance is running'
@ -306,7 +299,7 @@ class MediaVlcPlugin(MediaPlugin):
return self.status()
@action
def back(self, offset=30.0):
def back(self, *_, offset: float = 30.0, **__):
"""Back by (default: 30) seconds"""
if not self._player:
return None, 'No vlc instance is running'
@ -319,7 +312,7 @@ class MediaVlcPlugin(MediaPlugin):
return self.seek(pos)
@action
def forward(self, offset=30.0):
def forward(self, *_, offset: float = 30.0, **__):
"""Forward by (default: 30) seconds"""
if not self._player:
return None, 'No vlc instance is running'
@ -334,13 +327,12 @@ class MediaVlcPlugin(MediaPlugin):
return self.seek(pos)
@action
def toggle_subtitles(self, visibile=None):
def toggle_subtitles(self, *_, **__):
"""Toggle the subtitles visibility"""
if not self._player:
return None, 'No vlc instance is running'
if self._player.video_get_spu_count() == 0:
return None, 'The media file has no subtitles set'
assert self._player, 'No vlc instance is running'
assert (
self._player.video_get_spu_count() > 0
), 'The media file has no subtitles set'
if self._player.video_get_spu() is None or self._player.video_get_spu() == -1:
self._player.video_set_spu(0)
@ -350,36 +342,32 @@ class MediaVlcPlugin(MediaPlugin):
@action
def toggle_fullscreen(self):
"""Toggle the fullscreen mode"""
if not self._player:
return None, 'No vlc instance is running'
assert self._player, 'No vlc instance is running'
self._player.toggle_fullscreen()
@action
def set_fullscreen(self, fullscreen=True):
def set_fullscreen(self, fullscreen: bool = True):
"""Set fullscreen mode"""
if not self._player:
return None, 'No vlc instance is running'
assert self._player, 'No vlc instance is running'
self._player.set_fullscreen(fullscreen)
@action
def set_subtitles(self, filename, **args):
def set_subtitles(self, filename: str, *_, **__):
"""Sets media subtitles from filename"""
if not self._player:
return None, 'No vlc instance is running'
assert self._player, 'No vlc instance is running'
if filename.startswith('file://'):
filename = filename[len('file://') :]
self._player.video_set_subtitle_file(filename)
@action
def remove_subtitles(self):
def remove_subtitles(self, *_, **__):
"""Removes (hides) the subtitles"""
if not self._player:
return None, 'No vlc instance is running'
assert self._player, 'No vlc instance is running'
self._player.video_set_spu(-1)
@action
def is_playing(self):
def is_playing(self, *_, **__):
"""
:returns: True if it's playing, False otherwise
"""
@ -388,7 +376,7 @@ class MediaVlcPlugin(MediaPlugin):
return self._player.is_playing()
@action
def load(self, resource, **args):
def load(self, resource, *_, **args):
"""
Load/queue a resource/video to the player
"""
@ -398,14 +386,13 @@ class MediaVlcPlugin(MediaPlugin):
return self.status()
@action
def mute(self):
def mute(self, *_, **__):
"""Toggle mute state"""
if not self._player:
return None, 'No vlc instance is running'
assert self._player, 'No vlc instance is running'
self._player.audio_toggle_mute()
@action
def set_position(self, position):
def set_position(self, position: float, **_):
"""
Seek backward/forward to the specified absolute position (same as ``seek``)
"""
@ -434,9 +421,9 @@ class MediaVlcPlugin(MediaPlugin):
status = {}
vlc_state = self._player.get_state()
if vlc_state == vlc.State.Playing:
if vlc_state == vlc.State.Playing: # type: ignore
status['state'] = PlayerState.PLAY.value
elif vlc_state == vlc.State.Paused:
elif vlc_state == vlc.State.Paused: # type: ignore
status['state'] = PlayerState.PAUSE.value
else:
status['state'] = PlayerState.STOP.value
@ -446,6 +433,7 @@ class MediaVlcPlugin(MediaPlugin):
if self._player.get_media()
else None
)
status['position'] = (
float(self._player.get_time() / 1000)
if self._player.get_time() is not None
@ -477,7 +465,7 @@ class MediaVlcPlugin(MediaPlugin):
def _get_current_resource(self):
if not self._player or not self._player.get_media():
return
return None
return self._player.get_media().get_mrl()

View file

@ -1,7 +1,7 @@
import re
import threading
import time
from typing import Optional, Union
from typing import Collection, Optional, Union
from platypush.plugins import action
from platypush.plugins.music import MusicPlugin
@ -9,27 +9,29 @@ from platypush.plugins.music import MusicPlugin
class MusicMpdPlugin(MusicPlugin):
"""
This plugin allows you to interact with an MPD/Mopidy music server. MPD
(https://www.musicpd.org/) is a flexible server-side protocol/application
for handling music collections and playing music, mostly aimed to manage
local libraries. Mopidy (https://www.mopidy.com/) is an evolution of MPD,
compatible with the original protocol and with support for multiple music
sources through plugins (e.g. Spotify, TuneIn, Soundcloud, local files
etc.).
This plugin allows you to interact with an MPD/Mopidy music server.
`MPD <https://www.musicpd.org/>`_ is a flexible server-side
protocol/application for handling music collections and playing music,
mostly aimed to manage local libraries.
`Mopidy <https://www.mopidy.com/>`_ is an evolution of MPD, compatible with
the original protocol and with support for multiple music sources through
plugins (e.g. Spotify, TuneIn, Soundcloud, local files etc.).
.. note:: As of Mopidy 3.0 MPD is an optional interface provided by the
``mopidy-mpd`` extension. Make sure that you have the extension
installed and enabled on your instance to use this plugin with your
server.
**NOTE**: As of Mopidy 3.0 MPD is an optional interface provided by the ``mopidy-mpd`` extension. Make sure that you
have the extension installed and enabled on your instance to use this plugin with your server.
"""
_client_lock = threading.RLock()
def __init__(self, host, port=6600):
def __init__(self, host: str, port: int = 6600):
"""
:param host: MPD IP/hostname
:type host: str
:param port: MPD port (default: 6600)
:type port: int
"""
super().__init__()
@ -37,12 +39,12 @@ class MusicMpdPlugin(MusicPlugin):
self.port = port
self.client = None
def _connect(self, n_tries=2):
def _connect(self, n_tries: int = 2):
import mpd
with self._client_lock:
if self.client:
return
return self.client
error = None
while n_tries > 0:
@ -54,9 +56,9 @@ class MusicMpdPlugin(MusicPlugin):
except Exception as e:
error = e
self.logger.warning(
'Connection exception: {}{}'.format(
str(e), (': Retrying' if n_tries > 0 else '')
)
'Connection exception: %s%s',
e,
(': Retrying' if n_tries > 0 else ''),
)
time.sleep(0.5)
@ -64,7 +66,9 @@ class MusicMpdPlugin(MusicPlugin):
if error:
raise error
def _exec(self, method, *args, **kwargs):
return self.client
def _exec(self, method: str, *args, **kwargs):
error = None
n_tries = int(kwargs.pop('n_tries')) if 'n_tries' in kwargs else 2
return_status = (
@ -84,16 +88,16 @@ class MusicMpdPlugin(MusicPlugin):
except Exception as e:
error = str(e)
self.logger.warning(
'Exception while executing MPD method {}: {}'.format(method, error)
'Exception while executing MPD method %s: %s', method, error
)
self.client = None
return None, error
@action
def play(self, resource=None):
def play(self, resource: Optional[str] = None, **__):
"""
Play a resource by path/URI
Play a resource by path/URI.
:param resource: Resource path/URI
:type resource: str
@ -106,213 +110,184 @@ class MusicMpdPlugin(MusicPlugin):
return self._exec('play')
@action
def play_pos(self, pos):
def play_pos(self, pos: int):
"""
Play a track in the current playlist by position number
Play a track in the current playlist by position number.
:param pos: Position number
:param pos: Position number.
"""
return self._exec('play', pos)
@action
def pause(self):
def pause(self, *_, **__):
"""Pause playback"""
status = self.status().output['state']
if status == 'play':
return self._exec('pause')
else:
return self._exec('play')
status = self._status()['state']
return self._exec('pause') if status == 'play' else self._exec('play')
@action
def pause_if_playing(self):
"""Pause playback only if it's playing"""
status = self.status().output['state']
if status == 'play':
return self._exec('pause')
status = self._status()['state']
return self._exec('pause') if status == 'play' else None
@action
def play_if_paused(self):
"""Play only if it's paused (resume)"""
status = self.status().output['state']
if status == 'pause':
return self._exec('play')
status = self._status()['state']
return self._exec('play') if status == 'pause' else None
@action
def play_if_paused_or_stopped(self):
"""Play only if it's paused or stopped"""
status = self.status().output['state']
if status == 'pause' or status == 'stop':
return self._exec('play')
status = self._status()['state']
return self._exec('play') if status in ('pause', 'stop') else None
@action
def stop(self):
def stop(self, *_, **__):
"""Stop playback"""
return self._exec('stop')
@action
def play_or_stop(self):
"""Play or stop (play state toggle)"""
status = self.status().output['state']
status = self._status()['state']
if status == 'play':
return self._exec('stop')
else:
return self._exec('play')
@action
def playid(self, track_id):
def playid(self, track_id: str):
"""
Play a track by ID
Play a track by ID.
:param track_id: Track ID
:type track_id: str
:param track_id: Track ID.
"""
return self._exec('playid', track_id)
@action
def next(self):
def next(self, *_, **__):
"""Play the next track"""
return self._exec('next')
@action
def previous(self):
def previous(self, *_, **__):
"""Play the previous track"""
return self._exec('previous')
@action
def setvol(self, vol):
def setvol(self, vol: int):
"""
Set the volume (DEPRECATED, use :meth:`.set_volume` instead).
Set the volume.
:param vol: Volume value (range: 0-100)
:type vol: int
..warning :: **DEPRECATED**, use :meth:`.set_volume` instead.
:param vol: Volume value (range: 0-100).
"""
return self.set_volume(vol)
@action
def set_volume(self, volume):
def set_volume(self, volume: int, *_, **__):
"""
Set the volume.
:param volume: Volume value (range: 0-100)
:type volume: int
:param volume: Volume value (range: 0-100).
"""
return self._exec('setvol', str(volume))
@action
def volup(self, delta=10):
def volup(self, *_, delta: int = 10, **__):
"""
Turn up the volume
Turn up the volume.
:param delta: Volume up delta (default: +10%)
:type delta: int
:param delta: Volume up delta (default: +10%).
"""
volume = int(self.status().output['volume'])
volume = int(self._status()['volume'])
new_volume = min(volume + delta, 100)
return self.setvol(new_volume)
@action
def voldown(self, delta=10):
def voldown(self, *_, delta: int = 10, **__):
"""
Turn down the volume
Turn down the volume.
:param delta: Volume down delta (default: -10%)
:type delta: int
:param delta: Volume down delta (default: -10%).
"""
volume = int(self.status().output['volume'])
volume = int(self._status()['volume'])
new_volume = max(volume - delta, 0)
return self.setvol(new_volume)
@action
def random(self, value=None):
"""
Set random mode
:param value: If set, set the random state this value (true/false). Default: None (toggle current state)
:type value: bool
"""
def _toggle(self, key: str, value: Optional[bool] = None):
if value is None:
value = int(self.status().output['random'])
value = 1 if value == 0 else 0
return self._exec('random', value)
value = bool(self._status()[key])
return self._exec(key, int(value))
@action
def consume(self, value=None):
def random(self, value: Optional[bool] = None):
"""
Set consume mode
Set random mode.
:param value: If set, set the consume state this value (true/false). Default: None (toggle current state)
:type value: bool
:param value: If set, set the random state this value (true/false).
Default: None (toggle current state).
"""
if value is None:
value = int(self.status().output['consume'])
value = 1 if value == 0 else 0
return self._exec('consume', value)
return self._toggle('random', value)
@action
def single(self, value=None):
def consume(self, value: Optional[bool] = None):
"""
Set single mode
Set consume mode.
:param value: If set, set the consume state this value (true/false). Default: None (toggle current state)
:type value: bool
:param value: If set, set the consume state this value (true/false).
Default: None (toggle current state)
"""
if value is None:
value = int(self.status().output['single'])
value = 1 if value == 0 else 0
return self._exec('single', value)
return self._toggle('consume', value)
@action
def repeat(self, value=None):
def single(self, value: Optional[bool] = None):
"""
Set repeat mode
Set single mode.
:param value: If set, set the repeat state this value (true/false). Default: None (toggle current state)
:type value: bool
:param value: If set, set the consume state this value (true/false).
Default: None (toggle current state)
"""
return self._toggle('single', value)
if value is None:
value = int(self.status().output['repeat'])
value = 1 if value == 0 else 0
return self._exec('repeat', value)
@action
def repeat(self, value: Optional[bool] = None):
"""
Set repeat mode.
:param value: If set, set the repeat state this value (true/false).
Default: None (toggle current state)
"""
return self._toggle('repeat', value)
@action
def shuffle(self):
"""
Shuffles the current playlist
Shuffles the current playlist.
"""
return self._exec('shuffle')
@action
def save(self, name):
def save(self, name: str):
"""
Save the current tracklist to a new playlist with the specified name
Save the current tracklist to a new playlist with the specified name.
:param name: Name of the playlist
:type name: str
"""
return self._exec('save', name)
@action
def add(self, resource, position=None):
def add(self, resource: str, *_, position: Optional[int] = None, **__):
"""
Add a resource (track, album, artist, folder etc.) to the current playlist
Add a resource (track, album, artist, folder etc.) to the current
playlist.
:param resource: Resource path or URI
:type resource: str
:param position: Position where the track(s) will be inserted (default: end of the playlist)
:type position: int
:param resource: Resource path or URI.
:param position: Position where the track(s) will be inserted (default:
end of the playlist).
"""
if isinstance(resource, list):
@ -324,7 +299,7 @@ class MusicMpdPlugin(MusicPlugin):
else:
self._exec('addid', r, position)
except Exception as e:
self.logger.warning('Could not add {}: {}'.format(r, e))
self.logger.warning('Could not add %s: %s', r, e)
return self.status().output
@ -361,7 +336,7 @@ class MusicMpdPlugin(MusicPlugin):
if isinstance(playlist, str):
playlist = [playlist]
elif not isinstance(playlist, list):
raise RuntimeError('Invalid type for playlist: {}'.format(type(playlist)))
raise RuntimeError(f'Invalid type for playlist: {type(playlist)}')
for p in playlist:
self._exec('rm', p)
@ -382,11 +357,11 @@ class MusicMpdPlugin(MusicPlugin):
@classmethod
def _parse_resource(cls, resource):
if not resource:
return
return None
m = re.search(r'^https?://open\.spotify\.com/([^?]+)', resource)
if m:
resource = 'spotify:{}'.format(m.group(1).replace('/', ':'))
resource = 'spotify:' + m.group(1).replace('/', ':')
if resource.startswith('spotify:'):
resource = resource.split('?')[0]
@ -415,46 +390,59 @@ class MusicMpdPlugin(MusicPlugin):
return ret
@action
def clear(self):
def clear(self, *_, **__):
"""Clear the current playlist"""
return self._exec('clear')
@action
def seekcur(self, value):
def seekcur(self, value: float):
"""
Seek to the specified position (DEPRECATED, use :meth:`.seek` instead).
:param value: Seek position in seconds, or delta string (e.g. '+15' or '-15') to indicate a seek relative to
the current position :type value: int
:param value: Seek position in seconds, or delta string (e.g. '+15' or
'-15') to indicate a seek relative to the current position
"""
return self.seek(value)
@action
def seek(self, position):
def seek(self, position: float, *_, **__):
"""
Seek to the specified position
:param position: Seek position in seconds, or delta string (e.g. '+15' or '-15') to indicate a seek relative
to the current position :type position: int
:param position: Seek position in seconds, or delta string (e.g. '+15'
or '-15') to indicate a seek relative to the current position
"""
return self._exec('seekcur', position)
@action
def forward(self):
"""Go forward by 15 seconds"""
return self._exec('seekcur', '+15')
@action
def back(self):
"""Go backward by 15 seconds"""
return self._exec('seekcur', '-15')
def _status(self) -> dict:
n_tries = 2
error = None
while n_tries > 0:
try:
n_tries -= 1
self._connect()
if self.client:
return self.client.status() # type: ignore
except Exception as e:
error = e
self.logger.warning('Exception while getting MPD status: %s', e)
self.client = None
raise AssertionError(str(error))
@action
def status(self):
def status(self, *_, **__):
"""
:returns: The current state.
@ -480,24 +468,7 @@ class MusicMpdPlugin(MusicPlugin):
}
"""
n_tries = 2
error = None
while n_tries > 0:
try:
n_tries -= 1
self._connect()
if self.client:
return self.client.status()
except Exception as e:
error = e
self.logger.warning(
'Exception while getting MPD status: {}'.format(str(e))
)
self.client = None
return None, error
return self._status()
@action
def currentsong(self):
@ -506,9 +477,8 @@ class MusicMpdPlugin(MusicPlugin):
"""
return self.current_track()
# noinspection PyTypeChecker
@action
def current_track(self):
def current_track(self, *_, **__):
"""
:returns: The currently played track.
@ -530,6 +500,9 @@ class MusicMpdPlugin(MusicPlugin):
"""
track = self._exec('currentsong', return_status=False)
if not isinstance(track, dict):
return None
if 'title' in track and (
'artist' not in track
or not track['artist']
@ -583,7 +556,7 @@ class MusicMpdPlugin(MusicPlugin):
return self._exec('playlistinfo', return_status=False)
@action
def get_playlists(self):
def get_playlists(self, *_, **__):
"""
:returns: The playlists available on the server as a list of dicts.
@ -602,11 +575,12 @@ class MusicMpdPlugin(MusicPlugin):
# ...
}
]
"""
return sorted(
self._exec('listplaylists', return_status=False),
key=lambda p: p['playlist'],
playlists: list = self._exec( # type: ignore
'listplaylists', return_status=False
)
return sorted(playlists, key=lambda p: p['playlist'])
@action
def listplaylists(self):
@ -616,14 +590,13 @@ class MusicMpdPlugin(MusicPlugin):
return self.get_playlists()
@action
def get_playlist(self, playlist, with_tracks=False):
def get_playlist(self, playlist: str, *_, with_tracks: bool = False, **__):
"""
List the items in the specified playlist.
:param playlist: Name of the playlist
:type playlist: str
:param with_tracks: If True then the list of tracks in the playlist will be returned as well (default: False).
:type with_tracks: bool
:param with_tracks: If True then the list of tracks in the playlist will
be returned as well (default: False).
"""
return self._exec(
'listplaylistinfo' if with_tracks else 'listplaylist',
@ -632,29 +605,26 @@ class MusicMpdPlugin(MusicPlugin):
)
@action
def listplaylist(self, name):
def listplaylist(self, name: str):
"""
Deprecated alias for :meth:`.playlist`.
"""
return self._exec('listplaylist', name, return_status=False)
@action
def listplaylistinfo(self, name):
def listplaylistinfo(self, name: str):
"""
Deprecated alias for :meth:`.playlist` with `with_tracks=True`.
Deprecated alias for :meth:`.playlist` with ``with_tracks=True``.
"""
return self.get_playlist(name, with_tracks=True)
@action
def add_to_playlist(self, playlist, resources):
def add_to_playlist(self, playlist: str, resources: Union[str, Collection[str]]):
"""
Add one or multiple resources to a playlist.
:param playlist: Playlist name
:type playlist: str
:param resources: URI or path of the resource(s) to be added
:type resources: str or list[str]
"""
if isinstance(resources, str):
@ -664,22 +634,21 @@ class MusicMpdPlugin(MusicPlugin):
self._exec('playlistadd', playlist, res)
@action
def playlistadd(self, name, uri):
def playlistadd(self, name: str, uri: str):
"""
Deprecated alias for :meth:`.add_to_playlist`.
"""
return self.add_to_playlist(name, uri)
@action
def remove_from_playlist(self, playlist, resources):
def remove_from_playlist(
self, playlist: str, resources: Union[int, Collection[int]], *_, **__
):
"""
Remove one or multiple tracks from a playlist.
:param playlist: Playlist name
:type playlist: str
:param resources: Position or list of positions to remove
:type resources: int or list[int]
"""
if isinstance(resources, str):
@ -691,62 +660,53 @@ class MusicMpdPlugin(MusicPlugin):
self._exec('playlistdelete', playlist, p)
@action
def playlist_move(self, playlist, from_pos, to_pos):
def playlist_move(self, playlist: str, from_pos: int, to_pos: int, *_, **__):
"""
Change the position of a track in the specified playlist.
:param playlist: Playlist name
:type playlist: str
:param from_pos: Original track position
:type from_pos: int
:param to_pos: New track position
:type to_pos: int
"""
self._exec('playlistmove', playlist, from_pos, to_pos)
@action
def playlistdelete(self, name, pos):
def playlistdelete(self, name: str, pos: int):
"""
Deprecated alias for :meth:`.remove_from_playlist`.
"""
return self.remove_from_playlist(name, pos)
@action
def playlistmove(self, name, from_pos, to_pos):
def playlistmove(self, name: str, from_pos: int, to_pos: int):
"""
Deprecated alias for :meth:`.playlist_move`.
"""
return self.playlist_move(name, from_pos=from_pos, to_pos=to_pos)
@action
def playlistclear(self, name):
def playlistclear(self, name: str):
"""
Clears all the elements from the specified playlist
Clears all the elements from the specified playlist.
:param name: Playlist name
:type name: str
:param name: Playlist name.
"""
self._exec('playlistclear', name)
@action
def rename(self, name, new_name):
def rename(self, name: str, new_name: str):
"""
Rename a playlist
Rename a playlist.
:param name: Original playlist name
:type name: str
:param new_name: New playlist name
:type name: str
"""
self._exec('rename', name, new_name)
@action
def lsinfo(self, uri=None):
def lsinfo(self, uri: Optional[str] = None):
"""
Returns the list of playlists and directories on the server
Returns the list of playlists and directories on the server.
"""
return (
@ -756,37 +716,32 @@ class MusicMpdPlugin(MusicPlugin):
)
@action
def plchanges(self, version):
def plchanges(self, version: int):
"""
Show what has changed on the current playlist since a specified playlist
version number.
:param version: Version number
:type version: int
:returns: A list of dicts representing the songs being added since the specified version
"""
return self._exec('plchanges', version, return_status=False)
@action
def searchaddplaylist(self, name):
def searchaddplaylist(self, name: str):
"""
Search and add a playlist by (partial or full) name
Search and add a playlist by (partial or full) name.
:param name: Playlist name, can be partial
:type name: str
:param name: Playlist name, can be partial.
"""
resp: list = self._exec('listplaylists', return_status=False) # type: ignore
playlists = [
pl['playlist']
for pl in filter(
lambda playlist: name.lower() in playlist['playlist'].lower(),
self._exec('listplaylists', return_status=False),
)
pl['playlist'] for pl in resp if name.lower() in pl['playlist'].lower()
]
if len(playlists):
if not playlists:
return None
self._exec('clear')
self._exec('load', playlists[0])
self._exec('play')
@ -799,40 +754,37 @@ class MusicMpdPlugin(MusicPlugin):
ll.extend([k, v])
return ll
# noinspection PyShadowingBuiltins
@action
def find(self, filter: dict, *args, **kwargs):
def find(self, filter: dict, *args, **kwargs): # pylint: disable=redefined-builtin
"""
Find in the database/library by filter.
:param filter: Search filter (e.g. ``{"artist": "Led Zeppelin", "album": "IV"}``)
:returns: list[dict]
"""
filter_list = self._make_filter(filter)
return self._exec('find', *filter_list, *args, return_status=False, **kwargs)
filter = self._make_filter(filter)
return self._exec('find', *filter, *args, return_status=False, **kwargs)
# noinspection PyShadowingBuiltins
@action
def findadd(self, filter: dict, *args, **kwargs):
def findadd(
self, filter: dict, *args, **kwargs # pylint: disable=redefined-builtin
):
"""
Find in the database/library by filter and add to the current playlist.
:param filter: Search filter (e.g. ``{"artist": "Led Zeppelin", "album": "IV"}``)
:returns: list[dict]
"""
filter_list = self._make_filter(filter)
return self._exec('findadd', *filter_list, *args, return_status=False, **kwargs)
filter = self._make_filter(filter)
return self._exec('findadd', *filter, *args, return_status=False, **kwargs)
# noinspection PyShadowingBuiltins
@action
def search(
self,
query: Optional[Union[str, dict]] = None,
filter: Optional[dict] = None,
*args,
**kwargs
query: Optional[Union[str, dict]] = None,
filter: Optional[dict] = None, # pylint: disable=redefined-builtin
**kwargs,
):
"""
Free search by filter.
@ -842,26 +794,37 @@ class MusicMpdPlugin(MusicPlugin):
``query``, it's still here for back-compatibility reasons.
:returns: list[dict]
"""
filter = self._make_filter(query or filter)
items = self._exec('search', *filter, *args, return_status=False, **kwargs)
assert query or filter, 'Specify either `query` or `filter`'
filt = filter
if isinstance(query, str):
filt = query
elif isinstance(query, dict):
filt = {**(filter or {}), **query}
filter_list = self._make_filter(filt) if isinstance(filt, dict) else [query]
items: list = self._exec( # type: ignore
'search', *filter_list, *args, return_status=False, **kwargs
)
# Spotify results first
return sorted(
items, key=lambda item: 0 if item['file'].startswith('spotify:') else 1
)
# noinspection PyShadowingBuiltins
@action
def searchadd(self, filter, *args, **kwargs):
def searchadd(self, filter: dict, *args, **kwargs):
"""
Free search by filter and add the results to the current playlist.
:param filter: Search filter (e.g. ``{"artist": "Led Zeppelin", "album": "IV"}``)
:returns: list[dict]
"""
filter = self._make_filter(filter)
return self._exec('searchadd', *filter, *args, return_status=False, **kwargs)
filter_list = self._make_filter(filter)
return self._exec(
'searchadd', *filter_list, *args, return_status=False, **kwargs
)
# vim:sw=4:ts=4:et:

View file

@ -1,6 +1,7 @@
import json
import socket
import threading
from typing import Collection, Optional
from platypush.config import Config
from platypush.context import get_backend
@ -9,7 +10,7 @@ from platypush.plugins import Plugin, action
class MusicSnapcastPlugin(Plugin):
"""
Plugin to interact with a [Snapcast](https://github.com/badaix/snapcast)
Plugin to interact with a `Snapcast <https://github.com/badaix/snapcast>`_
instance, control clients mute status, volume, playback etc.
See https://github.com/badaix/snapcast/blob/master/doc/json_rpc_api/v2_0_0.md
@ -19,15 +20,13 @@ class MusicSnapcastPlugin(Plugin):
_DEFAULT_SNAPCAST_PORT = 1705
_SOCKET_EOL = '\r\n'.encode()
def __init__(self, host='localhost', port=_DEFAULT_SNAPCAST_PORT, **kwargs):
def __init__(
self, host: str = 'localhost', port: int = _DEFAULT_SNAPCAST_PORT, **kwargs
):
"""
:param host: Default Snapcast server host (default: localhost)
:type host: str
:param port: Default Snapcast server control port (default: 1705)
:type port: int
"""
super().__init__(**kwargs)
self.host = host
@ -46,23 +45,24 @@ class MusicSnapcastPlugin(Plugin):
self._latest_req_id += 1
return self._latest_req_id
def _connect(self, host=None, port=None):
def _connect(self, host: Optional[str] = None, port: Optional[int] = None):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.logger.info('Connecting to Snapcast host {}:{}'.format(host, port))
self.logger.info('Connecting to Snapcast host %s:%d', host, port)
sock.connect((host or self.host, port or self.port))
return sock
@classmethod
def _send(cls, sock, req):
def _send(cls, sock: socket.socket, req: dict):
if isinstance(req, dict):
req = json.dumps(req)
r = json.dumps(req)
if isinstance(req, str):
req = req.encode()
if not isinstance(req, bytes):
raise RuntimeError('Unsupported type {} for Snapcast request: {}'.
format(type(req), req))
r = req.encode()
if not isinstance(r, bytes):
raise RuntimeError(
f'Unsupported type {type(req)} for Snapcast request: {req}'
)
sock.send(req + cls._SOCKET_EOL)
sock.send(r + cls._SOCKET_EOL)
@classmethod
def _recv(cls, sock):
@ -71,54 +71,58 @@ class MusicSnapcastPlugin(Plugin):
buf += sock.recv(1)
return json.loads(buf.decode().strip()).get('result')
def _get_group(self, sock, group):
def _get_group(self, sock: socket.socket, group: str):
for g in self._status(sock).get('groups', []):
if group == g.get('id') or group == g.get('name'):
return g
def _get_client(self, sock, client):
return None
def _get_client(self, sock: socket.socket, client: str):
for g in self._status(sock).get('groups', []):
clients = g.get('clients', [])
for c in clients:
if client == c.get('id') or \
client == c.get('name') or \
client == c.get('host', {}).get('name') or \
client == c.get('host', {}).get('ip'):
if (
client == c.get('id')
or client == c.get('name')
or client == c.get('host', {}).get('name')
or client == c.get('host', {}).get('ip')
):
c['group_id'] = g.get('id')
return c
def _status(self, sock):
return None
def _status(self, sock: socket.socket):
request = {
'id': self._get_req_id(),
'jsonrpc': '2.0',
'method': 'Server.GetStatus'
'method': 'Server.GetStatus',
}
# noinspection PyTypeChecker
self._send(sock, request)
return (self._recv(sock) or {}).get('server', {})
@action
def status(self, host=None, port=None, client=None, group=None):
def status(
self,
host: Optional[str] = None,
port: Optional[int] = None,
client: Optional[str] = None,
group: Optional[str] = None,
):
"""
Get the status either of a Snapcast server, client or group
:param host: Snapcast server to query (default: default configured host)
:type host: str
:param port: Snapcast server port (default: default configured port)
:type port: int
:param client: Client ID or name (default: None)
:type client: str
:param group: Group ID or name (default: None)
:type group: str
:returns: dict.
:returns: dict. Example:
Example::
.. code-block:: json
"output": {
"groups": [
@ -192,7 +196,7 @@ class MusicSnapcastPlugin(Plugin):
"name": "mopidy",
"sampleformat": "48000:16:2"
},
"raw": "pipe:////tmp/snapfifo?buffer_ms=20&codec=pcm&name=mopidy&sampleformat=48000:16:2",
"raw": "pipe:////tmp/fifo?buffer_ms=20&codec=pcm&name=mopidy&sampleformat=48000:16:2",
"scheme": "pipe"
}
}
@ -213,33 +217,32 @@ class MusicSnapcastPlugin(Plugin):
return self._status(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning(f'Error on socket close: {e}')
self.logger.warning('Error on socket close: %s', e)
@action
def mute(self, client=None, group=None, mute=None, host=None, port=None):
def mute(
self,
client: Optional[str] = None,
group: Optional[str] = None,
mute: Optional[bool] = None,
host: Optional[str] = None,
port: Optional[int] = None,
):
"""
Set the mute status of a connected client or group
:param client: Client name or ID to mute
:type client: str
:param group: Group ID to mute
:type group: str
:param mute: Mute status. If not set, the mute status of the selected
client/group will be toggled.
:type mute: bool
:param host: Snapcast server to query (default: default configured host)
:type host: str
:param port: Snapcast server port (default: default configured port)
:type port: int
"""
if not client and not group:
if not (client and group):
raise RuntimeError('Please specify either a client or a group')
sock = None
@ -250,59 +253,62 @@ class MusicSnapcastPlugin(Plugin):
'id': self._get_req_id(),
'jsonrpc': '2.0',
'method': 'Group.SetMute' if group else 'Client.SetVolume',
'params': {}
'params': {},
}
if group:
group = self._get_group(sock, group)
cur_muted = group['muted']
request['params']['id'] = group['id']
g = self._get_group(sock, group)
assert g, f'No such group: {group}'
cur_muted = g['muted']
request['params']['id'] = g['id']
request['params']['mute'] = not cur_muted if mute is None else mute
else:
client = self._get_client(sock, client)
cur_muted = client['config']['volume']['muted']
request['params']['id'] = client['id']
elif client:
c = self._get_client(sock, client)
assert c, f'No such client: {client}'
cur_muted = c['config']['volume']['muted']
request['params']['id'] = c['id']
request['params']['volume'] = {}
request['params']['volume']['percent'] = client['config']['volume']['percent']
request['params']['volume']['muted'] = not cur_muted if mute is None else mute
request['params']['volume']['percent'] = c['config']['volume'][
'percent'
]
request['params']['volume']['muted'] = (
not cur_muted if mute is None else mute
)
# noinspection PyTypeChecker
self._send(sock, request)
return self._recv(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close', e)
self.logger.warning('Error on socket close: %s', e)
@action
def volume(self, client, volume=None, delta=None, mute=None, host=None,
port=None):
def volume(
self,
client: str,
volume: Optional[int] = None,
delta: Optional[int] = None,
mute: Optional[bool] = None,
host: Optional[str] = None,
port: Optional[int] = None,
):
"""
Set the volume of a connected client
Set the volume of a connected client.
:param client: Client name or ID
:type client: str
:param volume: Absolute volume to set between 0 and 100
:type volume: int
:param delta: Relative volume change in percentage (e.g. +10 or -10)
:type delta: int
:param mute: Set to true or false if you want to toggle the muted status
:type mute: bool
:param host: Snapcast server (default: default configured host)
:type host: str
:param port: Snapcast server port (default: default configured port)
:type port: int
"""
if volume is None and delta is None and mute is None:
raise RuntimeError('Please specify either an absolute volume or ' +
'relative delta')
raise RuntimeError(
'Please specify either an absolute volume or ' + 'relative delta'
)
sock = None
@ -312,56 +318,51 @@ class MusicSnapcastPlugin(Plugin):
'id': self._get_req_id(),
'jsonrpc': '2.0',
'method': 'Client.SetVolume',
'params': {}
'params': {},
}
client = self._get_client(sock, client)
cur_volume = int(client['config']['volume']['percent'])
cur_mute = bool(client['config']['volume']['muted'])
c = self._get_client(sock, client)
assert c, f'No such client: {client}'
cur_volume = int(c['config']['volume']['percent'])
cur_mute = bool(c['config']['volume']['muted'])
if volume is not None:
volume = int(volume)
elif delta is not None:
volume = cur_volume + int(delta)
if volume is not None:
if volume > 100: volume = 100
if volume < 0: volume = 0
else:
volume = cur_volume
volume = max(0, min(100, volume)) if volume is not None else cur_volume
if mute is None:
mute = cur_mute
request['params']['id'] = client['id']
request['params']['id'] = c['id']
request['params']['volume'] = {}
request['params']['volume']['percent'] = volume
request['params']['volume']['muted'] = mute
# noinspection PyTypeChecker
self._send(sock, request)
return self._recv(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close', e)
self.logger.warning('Error on socket close: %s', e)
@action
def set_client_name(self, client, name, host=None, port=None):
def set_client_name(
self,
client: str,
name: str,
host: Optional[str] = None,
port: Optional[int] = None,
):
"""
Set/change the name of a connected client
:param client: Current client name or ID to rename
:type client: str
:param name: New name
:type name: str
:param host: Snapcast server (default: default configured host)
:type host: str
:param port: Snapcast server port (default: default configured port)
:type port: int
"""
sock = None
@ -372,37 +373,37 @@ class MusicSnapcastPlugin(Plugin):
'id': self._get_req_id(),
'jsonrpc': '2.0',
'method': 'Client.SetName',
'params': {}
'params': {},
}
client = self._get_client(sock, client)
request['params']['id'] = client['id']
c = self._get_client(sock, client)
assert c, f'No such client: {client}'
request['params']['id'] = c['id']
request['params']['name'] = name
# noinspection PyTypeChecker
self._send(sock, request)
return self._recv(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close', e)
self.logger.warning('Error on socket close: %s', e)
@action
def set_group_name(self, group, name, host=None, port=None):
def set_group_name(
self,
group: str,
name: str,
host: Optional[str] = None,
port: Optional[int] = None,
):
"""
Set/change the name of a group
:param group: Group ID to rename
:type group: str
:param name: New name
:type name: str
:param host: Snapcast server (default: default configured host)
:type host: str
:param port: Snapcast server port (default: default configured port)
:type port: int
"""
sock = None
@ -416,34 +417,33 @@ class MusicSnapcastPlugin(Plugin):
'params': {
'id': group,
'name': name,
}
},
}
# noinspection PyTypeChecker
self._send(sock, request)
return self._recv(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close', e)
self.logger.warning('Error on socket close: %s', e)
@action
def set_latency(self, client, latency, host=None, port=None):
def set_latency(
self,
client: str,
latency: float,
host: Optional[str] = None,
port: Optional[int] = None,
):
"""
Set/change the latency of a connected client
:param client: Client name or ID
:type client: str
:param latency: New latency in milliseconds
:type latency: float
:param host: Snapcast server (default: default configured host)
:type host: str
:param port: Snapcast server port (default: default configured port)
:type port: int
"""
sock = None
@ -454,35 +454,31 @@ class MusicSnapcastPlugin(Plugin):
'id': self._get_req_id(),
'jsonrpc': '2.0',
'method': 'Client.SetLatency',
'params': {
'latency': latency
}
'params': {'latency': latency},
}
client = self._get_client(sock, client)
request['params']['id'] = client['id']
# noinspection PyTypeChecker
c = self._get_client(sock, client)
assert c, f'No such client: {client}'
request['params']['id'] = c['id']
self._send(sock, request)
return self._recv(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close', e)
self.logger.warning('Error on socket close: %s', e)
@action
def delete_client(self, client, host=None, port=None):
def delete_client(
self, client: str, host: Optional[str] = None, port: Optional[int] = None
):
"""
Delete a client from the Snapcast server
:param client: Client name or ID
:type client: str
:param host: Snapcast server (default: default configured host)
:type host: str
:param port: Snapcast server port (default: default configured port)
:type port: int
"""
sock = None
@ -493,132 +489,129 @@ class MusicSnapcastPlugin(Plugin):
'id': self._get_req_id(),
'jsonrpc': '2.0',
'method': 'Server.DeleteClient',
'params': {}
'params': {},
}
client = self._get_client(sock, client)
request['params']['id'] = client['id']
# noinspection PyTypeChecker
c = self._get_client(sock, client)
assert c, f'No such client: {client}'
request['params']['id'] = c['id']
self._send(sock, request)
return self._recv(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close', e)
self.logger.warning('Error on socket close: %s', e)
@action
def group_set_clients(self, group, clients, host=None, port=None):
def group_set_clients(
self,
group: str,
clients: Collection[str],
host: Optional[str] = None,
port: Optional[int] = None,
):
"""
Sets the clients for a group on a Snapcast server
:param group: Group name or ID
:type group: str
:param clients: List of client names or IDs
:type clients: list[str]
:param host: Snapcast server (default: default configured host)
:type host: str
:param port: Snapcast server port (default: default configured port)
:type port: int
"""
sock = None
try:
sock = self._connect(host or self.host, port or self.port)
group = self._get_group(sock, group)
g = self._get_group(sock, group)
assert g, f'No such group: {group}'
request = {
'id': self._get_req_id(),
'jsonrpc': '2.0',
'method': 'Group.SetClients',
'params': {
'id': group['id'],
'clients': []
}
'params': {'id': g['id'], 'clients': []},
}
for client in clients:
client = self._get_client(sock, client)
request['params']['clients'].append(client['id'])
c = self._get_client(sock, client)
assert c, f'No such client: {client}'
request['params']['clients'].append(c['id'])
# noinspection PyTypeChecker
self._send(sock, request)
return self._recv(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close', e)
self.logger.warning('Error on socket close: %s', e)
@action
def group_set_stream(self, group, stream_id, host=None, port=None):
def group_set_stream(
self,
group: str,
stream_id: str,
host: Optional[str] = None,
port: Optional[int] = None,
):
"""
Sets the active stream for a group.
:param group: Group name or ID
:type group: str
:param stream_id: Stream ID
:type stream_id: str
:param host: Snapcast server (default: default configured host)
:type host: str
:param port: Snapcast server port (default: default configured port)
:type port: int
"""
sock = None
try:
sock = self._connect(host or self.host, port or self.port)
group = self._get_group(sock, group)
g = self._get_group(sock, group)
assert g, f'No such group: {group}'
request = {
'id': self._get_req_id(),
'jsonrpc': '2.0',
'method': 'Group.SetStream',
'params': {
'id': group['id'],
'id': g['id'],
'stream_id': stream_id,
}
},
}
# noinspection PyTypeChecker
self._send(sock, request)
return self._recv(sock)
finally:
try:
if sock:
sock.close()
except Exception as e:
self.logger.warning('Error on socket close', e)
self.logger.warning('Error on socket close: %s', e)
@action
def get_backend_hosts(self):
"""
:return: A dict with the Snapcast hosts configured on the backend
in the format host -> port
in the format ``host -> port``.
"""
hosts = {}
for i in range(len(self.backend_hosts)):
hosts[self.backend_hosts[i]] = self.backend_ports[i]
return hosts
return {
host: self.backend_ports[i] for i, host in enumerate(self.backend_hosts)
}
@action
def get_playing_streams(self, exclude_local=False):
def get_playing_streams(self, exclude_local: bool = False):
"""
Returns the remote streams configured in the `music.snapcast` backend
that are currently active and unmuted.
:param exclude_local: Exclude localhost connections (default: False)
:type exclude_local: bool
:returns: dict with the host->port mapping.
:returns: dict with the host->port mapping. Example:
Example::
.. code-block:: json
{
"hosts": {
@ -630,39 +623,54 @@ class MusicSnapcastPlugin(Plugin):
"""
backend_hosts = self.get_backend_hosts().output
backend_hosts: dict = self.get_backend_hosts().output # type: ignore
playing_hosts = {}
def _worker(host, port):
try:
if exclude_local and (host == 'localhost'
or host == Config.get('device_id')):
if exclude_local and (
host == 'localhost' or host == Config.get('device_id')
):
return
server_status = self.status(host=host, port=port).output
client_status = self.status(host=host, port=port,
client=Config.get('device_id')).output
server_status: dict = self.status(host=host, port=port).output # type: ignore
client_status: dict = self.status( # type: ignore
host=host, port=port, client=Config.get('device_id')
).output
if client_status.get('config', {}).get('volume', {}).get('muted'):
return
group = [g for g in server_status.get('groups', {})
if g.get('id') == client_status.get('group_id')].pop(0)
group = next(
iter(
g
for g in server_status.get('groups', {})
if g.get('id') == client_status.get('group_id')
)
)
if group.get('muted'):
return
stream = [s for s in server_status.get('streams')
if s.get('id') == group.get('stream_id')].pop(0)
stream = next(
iter(
s
for s in server_status.get('streams', {})
if s.get('id') == group.get('stream_id')
)
)
if stream.get('status') != 'playing':
return
playing_hosts[host] = port
except Exception as e:
self.logger.warning(('Error while retrieving the status of ' +
'Snapcast host at {}:{}: {}').format(
host, port, str(e)))
self.logger.warning(
'Error while retrieving the status of Snapcast host at %s:%d: %s',
host,
port,
e,
)
workers = []
@ -677,4 +685,5 @@ class MusicSnapcastPlugin(Plugin):
return {'hosts': playing_hosts}
# vim:sw=4:ts=4:et:

View file

@ -79,10 +79,6 @@ class ZwaveMqttPlugin(
This plugin allows you to manage a Z-Wave network over MQTT through
`zwave-js-ui <https://github.com/zwave-js/zwave-js-ui>`_.
For historical reasons, it is advised to enable this plugin together
with the ``zwave.mqtt`` backend, or you may lose the ability to listen
to asynchronous events.
Configuration required on the zwave-js-ui gateway:
* Install the gateway following the instructions reported

View file

@ -134,7 +134,7 @@ class DocstringParser:
return None
lines = text.split("\n")
return (lines[0] + " " + tw.dedent("\n".join(lines[1:]) or "")).strip()
return (lines[0] + "\n" + tw.dedent("\n".join(lines[1:]) or "")).strip()
ctx = ParseContext(obj)
yield ctx
@ -203,13 +203,14 @@ class DocstringParser:
return
# Update the current parameter docstring if required
if (
ctx.state == ParseState.PARAM
and cls._is_continuation_line(line)
and ctx.cur_param in ctx.parsed_params
):
if ctx.state == ParseState.PARAM and cls._is_continuation_line(line):
if ctx.cur_param in ctx.parsed_params:
ctx.parsed_params[ctx.cur_param].doc = (
((ctx.parsed_params[ctx.cur_param].doc or "") + "\n" + line.rstrip())
(
(ctx.parsed_params[ctx.cur_param].doc or "")
+ "\n"
+ line.rstrip()
)
if ctx.parsed_params.get(ctx.cur_param)
and ctx.parsed_params[ctx.cur_param].doc
else ""

View file

@ -110,6 +110,7 @@ setup(
# Support for Google text2speech plugin
'google-tts': [
'oauth2client',
'httplib2',
'google-api-python-client',
'google-auth',
'google-cloud-texttospeech',
@ -130,7 +131,12 @@ setup(
'google-assistant-legacy': ['google-assistant-library', 'google-auth'],
'google-assistant': ['google-assistant-sdk[samples]', 'google-auth'],
# Support for the Google APIs
'google': ['oauth2client', 'google-auth', 'google-api-python-client'],
'google': [
'oauth2client',
'google-auth',
'google-api-python-client',
'httplib2',
],
# Support for Last.FM scrobbler plugin
'lastfm': ['pylast'],
# Support for custom hotword detection
@ -213,9 +219,9 @@ setup(
# Support for Trello integration
'trello': ['py-trello'],
# Support for Google Pub/Sub
'google-pubsub': ['google-cloud-pubsub', 'google-auth'],
'google-pubsub': ['google-cloud-pubsub', 'google-auth', 'httplib2'],
# Support for Google Translate
'google-translate': ['google-cloud-translate', 'google-auth'],
'google-translate': ['google-cloud-translate', 'google-auth', 'httplib2'],
# Support for keyboard/mouse plugin
'inputs': ['pyuserinput'],
# Support for Buienradar weather forecast