From f7c594cc3f42c4bfa044150363d8f47b9afbc05c Mon Sep 17 00:00:00 2001 From: Fabio Manganiello <info@fabiomanganiello.com> Date: Sat, 2 Apr 2022 22:47:23 +0200 Subject: [PATCH 01/12] get_bus() should return a default RedisBus() instance if the main bus is not registered --- platypush/context/__init__.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/platypush/context/__init__.py b/platypush/context/__init__.py index 43f764d73..bcadf289a 100644 --- a/platypush/context/__init__.py +++ b/platypush/context/__init__.py @@ -133,8 +133,11 @@ def get_plugin(plugin_name, reload=False): def get_bus() -> Bus: global main_bus - assert main_bus, 'The bus is not registered' - return main_bus + if main_bus: + return main_bus + + from platypush.bus.redis import RedisBus + return RedisBus() def get_or_create_event_loop(): From 486801653a1dc0ba64316031b837d822305a65cc Mon Sep 17 00:00:00 2001 From: Fabio Manganiello <info@fabiomanganiello.com> Date: Sun, 3 Apr 2022 00:26:39 +0200 Subject: [PATCH 02/12] Added `.exception` action to logger plugin --- platypush/plugins/logger/__init__.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/platypush/plugins/logger/__init__.py b/platypush/plugins/logger/__init__.py index efac544d6..73de5fb73 100644 --- a/platypush/plugins/logger/__init__.py +++ b/platypush/plugins/logger/__init__.py @@ -41,6 +41,13 @@ class LoggerPlugin(Plugin): """ self.logger.error(msg, *args, **kwargs) + @action + def exception(self, exception, *args, **kwargs): + """ + logger.exception wrapper + """ + self.logger.exception(exception, *args, **kwargs) + # vim:sw=4:ts=4:et: From 1b30bfc454c96be03562ffdb406b9b7b2843fa5e Mon Sep 17 00:00:00 2001 From: Fabio Manganiello <info@fabiomanganiello.com> Date: Mon, 4 Apr 2022 17:21:47 +0200 Subject: [PATCH 03/12] Added more pre-commit hooks --- .pre-commit-config.yaml | 21 +++++++++++++++++++-- setup.cfg | 3 +++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c2f163aa7..18c7457a1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -2,14 +2,31 @@ # See https://pre-commit.com/hooks.html for more hooks repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v3.2.0 + rev: v4.1.0 hooks: # - id: trailing-whitespace # - id: end-of-file-fixer - id: check-yaml + - id: check-json + - id: check-xml + - id: check-symlinks - id: check-added-large-files - repo: https://github.com/Lucas-C/pre-commit-hooks-nodejs - rev: v1.1.1 + rev: v1.1.2 hooks: - id: markdown-toc + +- repo: https://github.com/pycqa/flake8 + rev: 4.0.1 + hooks: + - id: flake8 + additional_dependencies: + - flake8-bugbear + - flake8-comprehensions + - flake8-simplify + +- repo: https://github.com/psf/black + rev: 22.3.0 + hooks: + - id: black diff --git a/setup.cfg b/setup.cfg index 862dce588..1dd336c25 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,3 +5,6 @@ tag = True [metadata] description-file = README.md + +[flake8] +max-line-length = 120 From ca25607262245d016024fc882fa7f4e519813261 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello <info@fabiomanganiello.com> Date: Mon, 4 Apr 2022 20:55:10 +0200 Subject: [PATCH 04/12] Skip string and underscore normalization in black --- pyproject.toml | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 pyproject.toml diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 000000000..e7c6caf61 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,4 @@ +[tool.black] +skip-string-normalization = true +skip-numeric-underscore-normalization = true + From 12887b61fe4d789024c24b3f6310e8880c9188a8 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello <info@fabiomanganiello.com> Date: Mon, 25 Apr 2022 14:02:13 +0200 Subject: [PATCH 05/12] Don't fail hard if the Linode API doesn't return a list of instances --- platypush/backend/linode/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/platypush/backend/linode/__init__.py b/platypush/backend/linode/__init__.py index 756eb2757..216a9f621 100644 --- a/platypush/backend/linode/__init__.py +++ b/platypush/backend/linode/__init__.py @@ -26,7 +26,7 @@ class LinodeBackend(SensorBackend): self.instances = set(instances or []) def process_data(self, data: Dict[str, dict], new_data: Optional[Dict[str, dict]] = None, **kwargs): - instances = data['instances'] + instances = data.get('instances', {}) old_instances = (self.data or {}).get('instances', {}) if self.instances: From a80adc996ff9459614b2d54952568fcd6709fe6a Mon Sep 17 00:00:00 2001 From: Fabio Manganiello <info@fabiomanganiello.com> Date: Mon, 25 Apr 2022 16:54:26 +0200 Subject: [PATCH 06/12] [WIP] Default config.yaml in case a configuration file is missing in the default locations --- MANIFEST.in | 1 + platypush/config/config.auto.yaml | 7 +++++++ platypush/config/config.default.yaml | 2 ++ 3 files changed, 10 insertions(+) create mode 100644 platypush/config/config.auto.yaml create mode 100644 platypush/config/config.default.yaml diff --git a/MANIFEST.in b/MANIFEST.in index e61e3251c..9163c3d77 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,4 @@ recursive-include platypush/backend/http/webapp/dist * include platypush/plugins/http/webpage/mercury-parser.js +include platypush/config/*.yaml global-include manifest.yaml diff --git a/platypush/config/config.auto.yaml b/platypush/config/config.auto.yaml new file mode 100644 index 000000000..466458baa --- /dev/null +++ b/platypush/config/config.auto.yaml @@ -0,0 +1,7 @@ +# Auto-generated configuration file. +# Do not edit manually - use the config.yaml file for manual modifications +# instead + +backend.http: + port: 8008 + websocket_port: 8009 diff --git a/platypush/config/config.default.yaml b/platypush/config/config.default.yaml new file mode 100644 index 000000000..1e36005fe --- /dev/null +++ b/platypush/config/config.default.yaml @@ -0,0 +1,2 @@ +include: + - config.auto.yaml From da73a5f1b9db2d4286ec3341d1c8e609b1dc703f Mon Sep 17 00:00:00 2001 From: Fabio Manganiello <info@fabiomanganiello.com> Date: Tue, 26 Apr 2022 19:30:26 +0200 Subject: [PATCH 07/12] Replaced deprecated json_output arg in NextCloud client with response.json_data --- platypush/plugins/nextcloud/__init__.py | 272 +++++++++++++++++++----- 1 file changed, 215 insertions(+), 57 deletions(-) diff --git a/platypush/plugins/nextcloud/__init__.py b/platypush/plugins/nextcloud/__init__.py index a2306cb3b..682c97d55 100644 --- a/platypush/plugins/nextcloud/__init__.py +++ b/platypush/plugins/nextcloud/__init__.py @@ -50,8 +50,13 @@ class NextcloudPlugin(Plugin): """ - def __init__(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, - **kwargs): + def __init__( + self, + url: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + **kwargs + ): """ :param url: URL to the index of your default NextCloud instance. :param username: Default NextCloud username. @@ -61,8 +66,13 @@ class NextcloudPlugin(Plugin): self.conf = ClientConfig(url=url, username=username, password=password) self._client = self._get_client(**self.conf.to_dict()) - def _get_client(self, url: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, - raise_on_empty: bool = False): + def _get_client( + self, + url: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + raise_on_empty: bool = False, + ): from nextcloud import NextCloud if not url: @@ -71,19 +81,25 @@ class NextcloudPlugin(Plugin): raise AssertionError('No url/username/password provided') return None - return NextCloud(endpoint=self.conf.url, user=self.conf.username, password=self.conf.password, - json_output=True) + return NextCloud( + endpoint=self.conf.url, + user=self.conf.username, + password=self.conf.password, + ) - return NextCloud(endpoint=url, user=username, password=password, json_output=True) + return NextCloud(endpoint=url, user=username, password=password) @staticmethod def _get_permissions(permissions: Optional[List[str]]) -> int: int_perm = 0 - for perm in (permissions or []): + for perm in permissions or []: perm = perm.upper() - assert hasattr(Permission, perm), 'Unknown permissions type: {}. Supported permissions: {}'.format( - perm, [p.name.lower() for p in Permission]) + assert hasattr( + Permission, perm + ), 'Unknown permissions type: {}. Supported permissions: {}'.format( + perm, [p.name.lower() for p in Permission] + ) if perm == 'ALL': int_perm = Permission.ALL.value @@ -96,8 +112,11 @@ class NextcloudPlugin(Plugin): @staticmethod def _get_share_type(share_type: str) -> int: share_type = share_type.upper() - assert hasattr(ShareType, share_type), 'Unknown share type: {}. Supported share types: {}'.format( - share_type, [s.name.lower() for s in ShareType]) + assert hasattr( + ShareType, share_type + ), 'Unknown share type: {}. Supported share types: {}'.format( + share_type, [s.name.lower() for s in ShareType] + ) return getattr(ShareType, share_type).value @@ -114,13 +133,23 @@ class NextcloudPlugin(Plugin): args=', '.join(args), sep=', ' if args and kwargs else '', kwargs=', '.join(['{}={}'.format(k, v) for k, v in kwargs.items()]), - error=response.meta.get('message', '[No message]') if hasattr(response, 'meta') else response.raw.reason) + error=response.meta.get('message', '[No message]') + if hasattr(response, 'meta') + else response.raw.reason, + ) - return response.data + return response.json_data @action - def get_activities(self, since: Optional[id] = None, limit: Optional[int] = None, object_type: Optional[str] = None, - object_id: Optional[int] = None, sort: str = 'desc', **server_args) -> List[str]: + def get_activities( + self, + since: Optional[id] = None, + limit: Optional[int] = None, + object_type: Optional[str] = None, + object_id: Optional[int] = None, + sort: str = 'desc', + **server_args + ) -> List[str]: """ Get the list of recent activities on an instance. @@ -132,9 +161,15 @@ class NextcloudPlugin(Plugin): :param server_args: Override the default server settings (see :meth:`._get_client` arguments). :return: The list of selected activities. """ - return self._execute(server_args, 'get_activities', since=since, limit=limit, object_type=object_type, - object_id=object_id, - sort=sort) + return self._execute( + server_args, + 'get_activities', + since=since, + limit=limit, + object_type=object_type, + object_id=object_id, + sort=sort, + ) @action def get_apps(self, **server_args) -> List[str]: @@ -216,8 +251,13 @@ class NextcloudPlugin(Plugin): return self._execute(server_args, 'get_group', group_id) @action - def get_groups(self, search: Optional[str] = None, limit: Optional[int] = None, offset: Optional[int] = None, - **server_args) -> List[str]: + def get_groups( + self, + search: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + **server_args + ) -> List[str]: """ Search for groups. @@ -226,7 +266,9 @@ class NextcloudPlugin(Plugin): :param offset: Start offset. :param server_args: Override the default server settings (see :meth:`._get_client` arguments). """ - return self._execute(server_args, 'get_groups', search=search, limit=limit, offset=offset).get('groups', []) + return self._execute( + server_args, 'get_groups', search=search, limit=limit, offset=offset + ).get('groups', []) @action def create_group_folder(self, name: str, **server_args): @@ -268,7 +310,9 @@ class NextcloudPlugin(Plugin): return self._execute(server_args, 'get_group_folders') @action - def rename_group_folder(self, folder_id: Union[int, str], new_name: str, **server_args): + def rename_group_folder( + self, folder_id: Union[int, str], new_name: str, **server_args + ): """ Rename a group folder. @@ -279,7 +323,9 @@ class NextcloudPlugin(Plugin): self._execute(server_args, 'rename_group_folder', folder_id, new_name) @action - def grant_access_to_group_folder(self, folder_id: Union[int, str], group_id: str, **server_args): + def grant_access_to_group_folder( + self, folder_id: Union[int, str], group_id: str, **server_args + ): """ Grant access to a group folder to a given group. @@ -290,7 +336,9 @@ class NextcloudPlugin(Plugin): self._execute(server_args, 'grant_access_to_group_folder', folder_id, group_id) @action - def revoke_access_to_group_folder(self, folder_id: Union[int, str], group_id: str, **server_args): + def revoke_access_to_group_folder( + self, folder_id: Union[int, str], group_id: str, **server_args + ): """ Revoke access to a group folder to a given group. @@ -301,7 +349,9 @@ class NextcloudPlugin(Plugin): self._execute(server_args, 'revoke_access_to_group_folder', folder_id, group_id) @action - def set_group_folder_quota(self, folder_id: Union[int, str], quota: Optional[int], **server_args): + def set_group_folder_quota( + self, folder_id: Union[int, str], quota: Optional[int], **server_args + ): """ Set the quota of a group folder. @@ -309,11 +359,21 @@ class NextcloudPlugin(Plugin): :param quota: Quota in bytes - set None for unlimited. :param server_args: Override the default server settings (see :meth:`._get_client` arguments). """ - self._execute(server_args, 'set_quota_of_group_folder', folder_id, quota if quota is not None else -3) + self._execute( + server_args, + 'set_quota_of_group_folder', + folder_id, + quota if quota is not None else -3, + ) @action - def set_group_folder_permissions(self, folder_id: Union[int, str], group_id: str, permissions: List[str], - **server_args): + def set_group_folder_permissions( + self, + folder_id: Union[int, str], + group_id: str, + permissions: List[str], + **server_args + ): """ Set the permissions on a folder for a group. @@ -330,8 +390,13 @@ class NextcloudPlugin(Plugin): :param server_args: Override the default server settings (see :meth:`._get_client` arguments). """ - self._execute(server_args, 'set_permissions_to_group_folder', folder_id, group_id, - self._get_permissions(permissions)) + self._execute( + server_args, + 'set_permissions_to_group_folder', + folder_id, + group_id, + self._get_permissions(permissions), + ) @action def get_notifications(self, **server_args) -> list: @@ -372,8 +437,16 @@ class NextcloudPlugin(Plugin): self._execute(server_args, 'delete_notification', notification_id) @action - def create_share(self, path: str, share_type: str, share_with: Optional[str] = None, public_upload: bool = False, - password: Optional[str] = None, permissions: Optional[List[str]] = None, **server_args) -> dict: + def create_share( + self, + path: str, + share_type: str, + share_with: Optional[str] = None, + public_upload: bool = False, + password: Optional[str] = None, + permissions: Optional[List[str]] = None, + **server_args + ) -> dict: """ Share a file/folder with a user/group or a public link. @@ -442,9 +515,16 @@ class NextcloudPlugin(Plugin): """ share_type = self._get_share_type(share_type) permissions = self._get_permissions(permissions or ['read']) - return self._execute(server_args, 'create_share', path, share_type=share_type, share_with=share_with, - public_upload=public_upload, - password=password, permissions=permissions) + return self._execute( + server_args, + 'create_share', + path, + share_type=share_type, + share_with=share_with, + public_upload=public_upload, + password=password, + permissions=permissions, + ) @action def get_shares(self, **server_args) -> List[dict]: @@ -516,8 +596,15 @@ class NextcloudPlugin(Plugin): return self._execute(server_args, 'get_share_info', str(share_id)) @action - def update_share(self, share_id: int, public_upload: Optional[bool] = None, password: Optional[str] = None, - permissions: Optional[List[str]] = None, expire_date: Optional[str] = None, **server_args): + def update_share( + self, + share_id: int, + public_upload: Optional[bool] = None, + password: Optional[str] = None, + permissions: Optional[List[str]] = None, + expire_date: Optional[str] = None, + **server_args + ): """ Update the permissions of a shared resource. @@ -539,8 +626,15 @@ class NextcloudPlugin(Plugin): if permissions: permissions = self._get_permissions(permissions) - self._execute(server_args, 'update_share', share_id, public_upload=public_upload, password=password, - permissions=permissions, expire_date=expire_date) + self._execute( + server_args, + 'update_share', + share_id, + public_upload=public_upload, + password=password, + permissions=permissions, + expire_date=expire_date, + ) @action def create_user(self, user_id: str, password: str, **server_args): @@ -611,8 +705,13 @@ class NextcloudPlugin(Plugin): return self._execute(server_args, 'get_user', user_id) @action - def get_users(self, search: Optional[str] = None, limit: Optional[int] = None, offset: Optional[int] = None, - **server_args) -> List[str]: + def get_users( + self, + search: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + **server_args + ) -> List[str]: """ Get the list of users matching some search criteria. @@ -621,7 +720,9 @@ class NextcloudPlugin(Plugin): :param offset: Search results offset (default: None). :return: List of the matched user IDs. """ - return self._execute(server_args, 'get_users', search=search, limit=limit, offset=offset) + return self._execute( + server_args, 'get_users', search=search, limit=limit, offset=offset + ) @action def delete_user(self, user_id: str, **server_args): @@ -733,8 +834,15 @@ class NextcloudPlugin(Plugin): self._execute(server_args, 'delete_path', user_id, path) @action - def upload_file(self, remote_path: str, local_path: Optional[str] = None, content: Optional[str] = None, - user_id: Optional[str] = None, timestamp: Optional[Union[datetime, int, str]] = None, **server_args): + def upload_file( + self, + remote_path: str, + local_path: Optional[str] = None, + content: Optional[str] = None, + user_id: Optional[str] = None, + timestamp: Optional[Union[datetime, int, str]] = None, + **server_args + ): """ Upload a file. @@ -753,17 +861,32 @@ class NextcloudPlugin(Plugin): if isinstance(timestamp, datetime): timestamp = int(timestamp.timestamp()) - assert (local_path or content) and not (local_path and content), 'Please specify either local_path or content' + assert (local_path or content) and not ( + local_path and content + ), 'Please specify either local_path or content' if local_path: method = 'upload_file' local_path = os.path.abspath(os.path.expanduser(local_path)) else: method = 'upload_file_contents' - return self._execute(server_args, method, user_id, local_path or content, remote_path, timestamp=timestamp) + return self._execute( + server_args, + method, + user_id, + local_path or content, + remote_path, + timestamp=timestamp, + ) @action - def download_file(self, remote_path: str, local_path: str, user_id: Optional[str] = None, **server_args): + def download_file( + self, + remote_path: str, + local_path: str, + user_id: Optional[str] = None, + **server_args + ): """ Download a file. @@ -783,8 +906,14 @@ class NextcloudPlugin(Plugin): os.chdir(cur_dir) @action - def list(self, path: str, user_id: Optional[str] = None, depth: int = 1, all_properties: bool = False, - **server_args) -> List[dict]: + def list( + self, + path: str, + user_id: Optional[str] = None, + depth: int = 1, + all_properties: bool = False, + **server_args + ) -> List[dict]: """ List the content of a folder on the NextCloud instance. @@ -795,10 +924,19 @@ class NextcloudPlugin(Plugin): :param server_args: Override the default server settings (see :meth:`._get_client` arguments). """ user_id = user_id or server_args.get('username', self.conf.username) - return self._execute(server_args, 'list_folders', user_id, path, depth=depth, all_properties=all_properties) + return self._execute( + server_args, + 'list_folders', + user_id, + path, + depth=depth, + all_properties=all_properties, + ) @action - def list_favorites(self, path: Optional[str] = None, user_id: Optional[str] = None, **server_args) -> List[dict]: + def list_favorites( + self, path: Optional[str] = None, user_id: Optional[str] = None, **server_args + ) -> List[dict]: """ List the favorite items for a user. @@ -810,7 +948,9 @@ class NextcloudPlugin(Plugin): return self._execute(server_args, 'list_folders', user_id, path) @action - def mark_favorite(self, path: Optional[str] = None, user_id: Optional[str] = None, **server_args): + def mark_favorite( + self, path: Optional[str] = None, user_id: Optional[str] = None, **server_args + ): """ Add a path to a user's favorites. @@ -822,7 +962,14 @@ class NextcloudPlugin(Plugin): self._execute(server_args, 'set_favorites', user_id, path) @action - def copy(self, path: str, destination: str, user_id: Optional[str] = None, overwrite: bool = False, **server_args): + def copy( + self, + path: str, + destination: str, + user_id: Optional[str] = None, + overwrite: bool = False, + **server_args + ): """ Copy a resource to another path. @@ -833,10 +980,19 @@ class NextcloudPlugin(Plugin): :param server_args: Override the default server settings (see :meth:`._get_client` arguments). """ user_id = user_id or server_args.get('username', self.conf.username) - self._execute(server_args, 'copy_path', user_id, path, destination, overwrite=overwrite) + self._execute( + server_args, 'copy_path', user_id, path, destination, overwrite=overwrite + ) @action - def move(self, path: str, destination: str, user_id: Optional[str] = None, overwrite: bool = False, **server_args): + def move( + self, + path: str, + destination: str, + user_id: Optional[str] = None, + overwrite: bool = False, + **server_args + ): """ Move a resource to another path. @@ -847,7 +1003,9 @@ class NextcloudPlugin(Plugin): :param server_args: Override the default server settings (see :meth:`._get_client` arguments). """ user_id = user_id or server_args.get('username', self.conf.username) - self._execute(server_args, 'move_path', user_id, path, destination, overwrite=overwrite) + self._execute( + server_args, 'move_path', user_id, path, destination, overwrite=overwrite + ) # vim:sw=4:ts=4:et: From 371fd7e46b1c4cbf2f7e3467c9a0e0d9bc7780b4 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello <info@fabiomanganiello.com> Date: Wed, 27 Apr 2022 13:57:42 +0200 Subject: [PATCH 08/12] Generate a default config.yaml if none is present instead of failing --- platypush/config/__init__.py | 178 ++++++++++++------ platypush/config/config.auto.yaml | 3 +- .../{config.default.yaml => config.yaml} | 0 3 files changed, 124 insertions(+), 57 deletions(-) rename platypush/config/{config.default.yaml => config.yaml} (100%) diff --git a/platypush/config/__init__.py b/platypush/config/__init__.py index 99e61e7b4..e9acfec45 100644 --- a/platypush/config/__init__.py +++ b/platypush/config/__init__.py @@ -1,4 +1,5 @@ import datetime +import glob import importlib import inspect import logging @@ -6,19 +7,25 @@ import os import pathlib import pkgutil import re +import shutil import socket import sys from typing import Optional import yaml -from platypush.utils import get_hash, is_functional_procedure, is_functional_hook, is_functional_cron +from platypush.utils import ( + get_hash, + is_functional_procedure, + is_functional_hook, + is_functional_cron, +) """ Config singleton instance """ _default_config_instance = None -class Config(object): +class Config: """ Configuration base class Usage: @@ -45,7 +52,9 @@ class Config(object): 'now': datetime.datetime.now, } - _workdir_location = os.path.join(os.path.expanduser('~'), '.local', 'share', 'platypush') + _workdir_location = os.path.join( + os.path.expanduser('~'), '.local', 'share', 'platypush' + ) _included_files = set() def __init__(self, cfgfile=None): @@ -61,14 +70,12 @@ class Config(object): cfgfile = self._get_default_cfgfile() if cfgfile is None: - raise RuntimeError('No config file specified and nothing found in {}' - .format(self._cfgfile_locations)) + cfgfile = self._create_default_config() self._cfgfile = os.path.abspath(os.path.expanduser(cfgfile)) self._config = self._read_config_file(self._cfgfile) if 'token' in self._config: - self._config['token'] = self._config['token'] self._config['token_hash'] = get_hash(self._config['token']) if 'workdir' not in self._config: @@ -76,11 +83,15 @@ class Config(object): os.makedirs(self._config['workdir'], exist_ok=True) if 'scripts_dir' not in self._config: - self._config['scripts_dir'] = os.path.join(os.path.dirname(cfgfile), 'scripts') + self._config['scripts_dir'] = os.path.join( + os.path.dirname(cfgfile), 'scripts' + ) os.makedirs(self._config['scripts_dir'], mode=0o755, exist_ok=True) if 'dashboards_dir' not in self._config: - self._config['dashboards_dir'] = os.path.join(os.path.dirname(cfgfile), 'dashboards') + self._config['dashboards_dir'] = os.path.join( + os.path.dirname(cfgfile), 'dashboards' + ) os.makedirs(self._config['dashboards_dir'], mode=0o755, exist_ok=True) init_py = os.path.join(self._config['scripts_dir'], '__init__.py') @@ -90,13 +101,20 @@ class Config(object): # Include scripts_dir parent in sys.path so members can be imported in scripts # through the `scripts` package - scripts_parent_dir = str(pathlib.Path(self._config['scripts_dir']).absolute().parent) + scripts_parent_dir = str( + pathlib.Path(self._config['scripts_dir']).absolute().parent + ) sys.path = [scripts_parent_dir] + sys.path - self._config['db'] = self._config.get('main.db', { - 'engine': 'sqlite:///' + os.path.join( - os.path.expanduser('~'), '.local', 'share', 'platypush', 'main.db') - }) + self._config['db'] = self._config.get( + 'main.db', + { + 'engine': 'sqlite:///' + + os.path.join( + os.path.expanduser('~'), '.local', 'share', 'platypush', 'main.db' + ) + }, + ) logging_config = { 'level': logging.INFO, @@ -112,8 +130,11 @@ class Config(object): try: os.makedirs(logdir, exist_ok=True) except Exception as e: - print('Unable to create logs directory {}: {}'.format( - logdir, str(e))) + print( + 'Unable to create logs directory {}: {}'.format( + logdir, str(e) + ) + ) v = logfile del logging_config['stream'] @@ -150,9 +171,18 @@ class Config(object): self._init_components() self._init_dashboards(self._config['dashboards_dir']) + def _create_default_config(self): + cfg_mod_dir = os.path.dirname(os.path.abspath(__file__)) + cfgfile = self._cfgfile_locations[0] + cfgdir = pathlib.Path(cfgfile).parent + cfgdir.mkdir(parents=True, exist_ok=True) + for cfgfile in glob.glob(os.path.join(cfg_mod_dir, 'config*.yaml')): + shutil.copy(cfgfile, str(cfgdir)) + + return cfgfile + def _read_config_file(self, cfgfile): - cfgfile_dir = os.path.dirname(os.path.abspath( - os.path.expanduser(cfgfile))) + cfgfile_dir = os.path.dirname(os.path.abspath(os.path.expanduser(cfgfile))) config = {} @@ -164,9 +194,11 @@ class Config(object): for section in file_config: if section == 'include': - include_files = file_config[section] \ - if isinstance(file_config[section], list) \ + include_files = ( + file_config[section] + if isinstance(file_config[section], list) else [file_config[section]] + ) for include_file in include_files: if not os.path.isabs(include_file): @@ -178,9 +210,13 @@ class Config(object): config[incl_section] = included_config[incl_section] elif section == 'scripts_dir': assert isinstance(file_config[section], str) - config['scripts_dir'] = os.path.abspath(os.path.expanduser(file_config[section])) - elif 'disabled' not in file_config[section] \ - or file_config[section]['disabled'] is False: + config['scripts_dir'] = os.path.abspath( + os.path.expanduser(file_config[section]) + ) + elif ( + 'disabled' not in file_config[section] + or file_config[section]['disabled'] is False + ): config[section] = file_config[section] return config @@ -189,27 +225,37 @@ class Config(object): try: module = importlib.import_module(modname) except Exception as e: - print('Unhandled exception while importing module {}: {}'.format(modname, str(e))) + print( + 'Unhandled exception while importing module {}: {}'.format( + modname, str(e) + ) + ) return prefix = modname + '.' if prefix is None else prefix - self.procedures.update(**{ - prefix + name: obj - for name, obj in inspect.getmembers(module) - if is_functional_procedure(obj) - }) + self.procedures.update( + **{ + prefix + name: obj + for name, obj in inspect.getmembers(module) + if is_functional_procedure(obj) + } + ) - self.event_hooks.update(**{ - prefix + name: obj - for name, obj in inspect.getmembers(module) - if is_functional_hook(obj) - }) + self.event_hooks.update( + **{ + prefix + name: obj + for name, obj in inspect.getmembers(module) + if is_functional_hook(obj) + } + ) - self.cronjobs.update(**{ - prefix + name: obj - for name, obj in inspect.getmembers(module) - if is_functional_cron(obj) - }) + self.cronjobs.update( + **{ + prefix + name: obj + for name, obj in inspect.getmembers(module) + if is_functional_cron(obj) + } + ) def _load_scripts(self): scripts_dir = self._config['scripts_dir'] @@ -218,14 +264,19 @@ class Config(object): scripts_modname = os.path.basename(scripts_dir) self._load_module(scripts_modname, prefix='') - for _, modname, _ in pkgutil.walk_packages(path=[scripts_dir], onerror=lambda x: None): + for _, modname, _ in pkgutil.walk_packages( + path=[scripts_dir], onerror=lambda _: None + ): self._load_module(modname) sys.path = sys_path def _init_components(self): for key in self._config.keys(): - if key.startswith('backend.') and '.'.join(key.split('.')[1:]) in self._backend_manifests: + if ( + key.startswith('backend.') + and '.'.join(key.split('.')[1:]) in self._backend_manifests + ): backend_name = '.'.join(key.split('.')[1:]) self.backends[backend_name] = self._config[key] elif key.startswith('event.hook.'): @@ -236,7 +287,7 @@ class Config(object): self.cronjobs[cron_name] = self._config[key] elif key.startswith('procedure.'): tokens = key.split('.') - _async = True if len(tokens) > 2 and tokens[1] == 'async' else False + _async = bool(len(tokens) > 2 and tokens[1] == 'async') procedure_name = '.'.join(tokens[2:] if len(tokens) > 2 else tokens[1:]) args = [] m = re.match(r'^([^(]+)\(([^)]+)\)\s*', procedure_name) @@ -265,7 +316,11 @@ class Config(object): self._init_manifests(plugins_dir) self._init_manifests(backends_dir) else: - manifests_map = self._plugin_manifests if base_dir.endswith('plugins') else self._backend_manifests + manifests_map = ( + self._plugin_manifests + if base_dir.endswith('plugins') + else self._backend_manifests + ) for mf in pathlib.Path(base_dir).rglob('manifest.yaml'): with open(mf, 'r') as f: manifest = yaml.safe_load(f)['manifest'] @@ -279,12 +334,11 @@ class Config(object): for (key, value) in self._default_constants.items(): self.constants[key] = value - @staticmethod - def get_dashboard(name: str, dashboards_dir: Optional[str] = None) -> Optional[str]: - global _default_config_instance - - # noinspection PyProtectedMember,PyProtectedMember,PyUnresolvedReferences - dashboards_dir = dashboards_dir or _default_config_instance._config['dashboards_dir'] + def _get_dashboard( + self, name: str, dashboards_dir: Optional[str] = None + ) -> Optional[str]: + dashboards_dir = dashboards_dir or self._config['dashboards_dir'] + assert dashboards_dir abspath = os.path.join(dashboards_dir, name + '.xml') if not os.path.isfile(abspath): return @@ -292,24 +346,37 @@ class Config(object): with open(abspath, 'r') as fp: return fp.read() - @classmethod - def get_dashboards(cls, dashboards_dir: Optional[str] = None) -> dict: - global _default_config_instance + def _get_dashboards(self, dashboards_dir: Optional[str] = None) -> dict: dashboards = {} - # noinspection PyProtectedMember,PyProtectedMember,PyUnresolvedReferences - dashboards_dir = dashboards_dir or _default_config_instance._config['dashboards_dir'] + dashboards_dir = dashboards_dir or self._config['dashboards_dir'] + assert dashboards_dir + for f in os.listdir(dashboards_dir): abspath = os.path.join(dashboards_dir, f) if not os.path.isfile(abspath) or not abspath.endswith('.xml'): continue name = f.split('.xml')[0] - dashboards[name] = cls.get_dashboard(name, dashboards_dir) + dashboards[name] = self._get_dashboard(name, dashboards_dir) return dashboards + @staticmethod + def get_dashboard(name: str, dashboards_dir: Optional[str] = None) -> Optional[str]: + global _default_config_instance + if _default_config_instance is None: + _default_config_instance = Config() + return _default_config_instance._get_dashboard(name, dashboards_dir) + + @classmethod + def get_dashboards(cls, dashboards_dir: Optional[str] = None) -> dict: + global _default_config_instance + if _default_config_instance is None: + _default_config_instance = Config() + return _default_config_instance._get_dashboards(dashboards_dir) + def _init_dashboards(self, dashboards_dir: str): - self.dashboards = self.get_dashboards(dashboards_dir) + self.dashboards = self._get_dashboards(dashboards_dir) @staticmethod def get_backends(): @@ -400,4 +467,5 @@ class Config(object): return _default_config_instance._config + # vim:sw=4:ts=4:et: diff --git a/platypush/config/config.auto.yaml b/platypush/config/config.auto.yaml index 466458baa..e9bbcd0ce 100644 --- a/platypush/config/config.auto.yaml +++ b/platypush/config/config.auto.yaml @@ -3,5 +3,4 @@ # instead backend.http: - port: 8008 - websocket_port: 8009 + enabled: True diff --git a/platypush/config/config.default.yaml b/platypush/config/config.yaml similarity index 100% rename from platypush/config/config.default.yaml rename to platypush/config/config.yaml From fee5fc4ae0d042cdb96a0087ef04465edfc77853 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello <info@fabiomanganiello.com> Date: Wed, 27 Apr 2022 14:52:41 +0200 Subject: [PATCH 09/12] HTTP backend dependencies moved from optional to required If Platypush is supposed to work also without a manually created `config.yaml`, and the HTTP backend is enabled by default in that configuration, then Flask and companions should be among the required dependencies. --- platypush/backend/http/__init__.py | 147 ++++++++++++++++++--------- platypush/backend/http/manifest.yaml | 3 - requirements.txt | 1 + setup.py | 42 +++++--- 4 files changed, 132 insertions(+), 61 deletions(-) diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index 2e3cb3960..21c649887 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -91,14 +91,16 @@ class HttpBackend(Backend): other music plugin enabled. --> <Music class="col-3" /> - <!-- Show current date, time and weather. It requires a `weather` plugin or backend enabled --> + <!-- Show current date, time and weather. + It requires a `weather` plugin or backend enabled --> <DateTimeWeather class="col-3" /> </Row> <!-- Display the following widgets on a second row --> <Row> <!-- Show a carousel of images from a local folder. For security reasons, the folder must be - explicitly exposed as an HTTP resource through the backend `resource_dirs` attribute. --> + explicitly exposed as an HTTP resource through the backend + `resource_dirs` attribute. --> <ImageCarousel class="col-6" img-dir="/mnt/hd/photos/carousel" /> <!-- Show the news headlines parsed from a list of RSS feed and stored locally through the @@ -151,11 +153,7 @@ class HttpBackend(Backend): Requires: - * **flask** (``pip install flask``) - * **bcrypt** (``pip install bcrypt``) - * **magic** (``pip install python-magic``), optional, for MIME type - support if you want to enable media streaming - * **gunicorn** (``pip install gunicorn``) - optional but recommended. + * **gunicorn** (``pip install gunicorn``) - optional, to run the Platypush webapp over uWSGI. By default the Platypush web server will run in a process spawned on the fly by the HTTP backend. However, being a @@ -174,12 +172,22 @@ class HttpBackend(Backend): _DEFAULT_HTTP_PORT = 8008 _DEFAULT_WEBSOCKET_PORT = 8009 - def __init__(self, port=_DEFAULT_HTTP_PORT, - websocket_port=_DEFAULT_WEBSOCKET_PORT, - bind_address='0.0.0.0', - disable_websocket=False, resource_dirs=None, - ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None, - maps=None, run_externally=False, uwsgi_args=None, **kwargs): + def __init__( + self, + port=_DEFAULT_HTTP_PORT, + websocket_port=_DEFAULT_WEBSOCKET_PORT, + bind_address='0.0.0.0', + disable_websocket=False, + resource_dirs=None, + ssl_cert=None, + ssl_key=None, + ssl_cafile=None, + ssl_capath=None, + maps=None, + run_externally=False, + uwsgi_args=None, + **kwargs + ): """ :param port: Listen port for the web server (default: 8008) :type port: int @@ -246,26 +254,37 @@ class HttpBackend(Backend): self.bind_address = bind_address if resource_dirs: - self.resource_dirs = {name: os.path.abspath( - os.path.expanduser(d)) for name, d in resource_dirs.items()} + self.resource_dirs = { + name: os.path.abspath(os.path.expanduser(d)) + for name, d in resource_dirs.items() + } else: self.resource_dirs = {} self.active_websockets = set() self.run_externally = run_externally self.uwsgi_args = uwsgi_args or [] - self.ssl_context = get_ssl_server_context(ssl_cert=ssl_cert, - ssl_key=ssl_key, - ssl_cafile=ssl_cafile, - ssl_capath=ssl_capath) \ - if ssl_cert else None + self.ssl_context = ( + get_ssl_server_context( + ssl_cert=ssl_cert, + ssl_key=ssl_key, + ssl_cafile=ssl_cafile, + ssl_capath=ssl_capath, + ) + if ssl_cert + else None + ) if self.uwsgi_args: - self.uwsgi_args = [str(_) for _ in self.uwsgi_args] + \ - ['--module', 'platypush.backend.http.uwsgi', '--enable-threads'] + self.uwsgi_args = [str(_) for _ in self.uwsgi_args] + [ + '--module', + 'platypush.backend.http.uwsgi', + '--enable-threads', + ] - self.local_base_url = '{proto}://localhost:{port}'.\ - format(proto=('https' if ssl_cert else 'http'), port=self.port) + self.local_base_url = '{proto}://localhost:{port}'.format( + proto=('https' if ssl_cert else 'http'), port=self.port + ) self._websocket_lock_timeout = 10 self._websocket_lock = threading.RLock() @@ -275,7 +294,7 @@ class HttpBackend(Backend): self.logger.warning('Use cURL or any HTTP client to query the HTTP backend') def on_stop(self): - """ On backend stop """ + """On backend stop""" super().on_stop() self.logger.info('Received STOP event on HttpBackend') @@ -284,7 +303,9 @@ class HttpBackend(Backend): self.server_proc.kill() self.server_proc.wait(timeout=10) if self.server_proc.poll() is not None: - self.logger.info('HTTP server process may be still alive at termination') + self.logger.info( + 'HTTP server process may be still alive at termination' + ) else: self.logger.info('HTTP server process terminated') else: @@ -293,17 +314,25 @@ class HttpBackend(Backend): if self.server_proc.is_alive(): self.server_proc.kill() if self.server_proc.is_alive(): - self.logger.info('HTTP server process may be still alive at termination') + self.logger.info( + 'HTTP server process may be still alive at termination' + ) else: self.logger.info('HTTP server process terminated') - if self.websocket_thread and self.websocket_thread.is_alive() and self._websocket_loop: + if ( + self.websocket_thread + and self.websocket_thread.is_alive() + and self._websocket_loop + ): self._websocket_loop.stop() self.logger.info('HTTP websocket service terminated') def _acquire_websocket_lock(self, ws): try: - acquire_ok = self._websocket_lock.acquire(timeout=self._websocket_lock_timeout) + acquire_ok = self._websocket_lock.acquire( + timeout=self._websocket_lock_timeout + ) if not acquire_ok: raise TimeoutError('Websocket lock acquire timeout') @@ -313,13 +342,19 @@ class HttpBackend(Backend): finally: self._websocket_lock.release() - acquire_ok = self._websocket_locks[addr].acquire(timeout=self._websocket_lock_timeout) + acquire_ok = self._websocket_locks[addr].acquire( + timeout=self._websocket_lock_timeout + ) if not acquire_ok: - raise TimeoutError('Websocket on address {} not ready to receive data'.format(addr)) + raise TimeoutError( + 'Websocket on address {} not ready to receive data'.format(addr) + ) def _release_websocket_lock(self, ws): try: - acquire_ok = self._websocket_lock.acquire(timeout=self._websocket_lock_timeout) + acquire_ok = self._websocket_lock.acquire( + timeout=self._websocket_lock_timeout + ) if not acquire_ok: raise TimeoutError('Websocket lock acquire timeout') @@ -327,12 +362,15 @@ class HttpBackend(Backend): if addr in self._websocket_locks: self._websocket_locks[addr].release() except Exception as e: - self.logger.warning('Unhandled exception while releasing websocket lock: {}'.format(str(e))) + self.logger.warning( + 'Unhandled exception while releasing websocket lock: {}'.format(str(e)) + ) finally: self._websocket_lock.release() def notify_web_clients(self, event): - """ Notify all the connected web clients (over websocket) of a new event """ + """Notify all the connected web clients (over websocket) of a new event""" + async def send_event(ws): try: self._acquire_websocket_lock(ws) @@ -349,26 +387,35 @@ class HttpBackend(Backend): try: loop.run_until_complete(send_event(_ws)) except ConnectionClosed: - self.logger.warning('Websocket client {} connection lost'.format(_ws.remote_address)) + self.logger.warning( + 'Websocket client {} connection lost'.format(_ws.remote_address) + ) self.active_websockets.remove(_ws) if _ws.remote_address in self._websocket_locks: del self._websocket_locks[_ws.remote_address] def websocket(self): - """ Websocket main server """ + """Websocket main server""" set_thread_name('WebsocketServer') async def register_websocket(websocket, path): - address = websocket.remote_address if websocket.remote_address \ + address = ( + websocket.remote_address + if websocket.remote_address else '<unknown client>' + ) - self.logger.info('New websocket connection from {} on path {}'.format(address, path)) + self.logger.info( + 'New websocket connection from {} on path {}'.format(address, path) + ) self.active_websockets.add(websocket) try: await websocket.recv() except ConnectionClosed: - self.logger.info('Websocket client {} closed connection'.format(address)) + self.logger.info( + 'Websocket client {} closed connection'.format(address) + ) self.active_websockets.remove(websocket) if address in self._websocket_locks: del self._websocket_locks[address] @@ -379,8 +426,13 @@ class HttpBackend(Backend): self._websocket_loop = get_or_create_event_loop() self._websocket_loop.run_until_complete( - websocket_serve(register_websocket, self.bind_address, self.websocket_port, - **websocket_args)) + websocket_serve( + register_websocket, + self.bind_address, + self.websocket_port, + **websocket_args + ) + ) self._websocket_loop.run_forever() def _start_web_server(self): @@ -415,8 +467,9 @@ class HttpBackend(Backend): self.websocket_thread.start() if not self.run_externally: - self.server_proc = Process(target=self._start_web_server(), - name='WebServer') + self.server_proc = Process( + target=self._start_web_server(), name='WebServer' + ) self.server_proc.start() self.server_proc.join() elif self.uwsgi_args: @@ -424,9 +477,11 @@ class HttpBackend(Backend): self.logger.info('Starting uWSGI with arguments {}'.format(uwsgi_cmd)) self.server_proc = subprocess.Popen(uwsgi_cmd) else: - self.logger.info('The web server is configured to be launched externally but ' + - 'no uwsgi_args were provided. Make sure that you run another external service' + - 'for the webserver (e.g. nginx)') + self.logger.info( + 'The web server is configured to be launched externally but ' + + 'no uwsgi_args were provided. Make sure that you run another external service' + + 'for the webserver (e.g. nginx)' + ) # vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/manifest.yaml b/platypush/backend/http/manifest.yaml index a08655372..3ad89d81a 100644 --- a/platypush/backend/http/manifest.yaml +++ b/platypush/backend/http/manifest.yaml @@ -2,9 +2,6 @@ manifest: events: {} install: pip: - - flask - - bcrypt - - python-magic - gunicorn package: platypush.backend.http type: backend diff --git a/requirements.txt b/requirements.txt index 2fc1c8fbf..0ac8538f1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -20,3 +20,4 @@ zeroconf paho-mqtt websocket-client croniter +python-magic diff --git a/setup.py b/setup.py index 63e933fe2..f76bb7ae8 100755 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ def readfile(fname): def pkg_files(dir): paths = [] # noinspection PyShadowingNames - for (path, dirs, files) in os.walk(dir): + for (path, _, files) in os.walk(dir): for file in files: paths.append(os.path.join('..', path, file)) return paths @@ -68,17 +68,21 @@ setup( 'pyjwt', 'marshmallow', 'frozendict', + 'flask', + 'bcrypt', + 'python-magic', ], - extras_require={ # Support for thread custom name 'threadname': ['python-prctl'], # Support for Kafka backend and plugin 'kafka': ['kafka-python'], # Support for Pushbullet backend and plugin - 'pushbullet': ['pushbullet.py @ https://github.com/rbrcsk/pushbullet.py/tarball/master'], - # Support for HTTP backend - 'http': ['flask', 'bcrypt', 'python-magic', 'gunicorn'], + 'pushbullet': [ + 'pushbullet.py @ https://github.com/rbrcsk/pushbullet.py/tarball/master' + ], + # Support for HTTP backend over uWSGI + 'http': ['gunicorn'], # Support for MQTT backends 'mqtt': ['paho-mqtt'], # Support for RSS feeds parser @@ -90,7 +94,11 @@ setup( # Support for MPD/Mopidy music server plugin and backend 'mpd': ['python-mpd2'], # Support for Google text2speech plugin - 'google-tts': ['oauth2client', 'google-api-python-client', 'google-cloud-texttospeech'], + 'google-tts': [ + 'oauth2client', + 'google-api-python-client', + 'google-cloud-texttospeech', + ], # Support for OMXPlayer plugin 'omxplayer': ['omxplayer-wrapper'], # Support for YouTube @@ -138,7 +146,8 @@ setup( # Support for web media subtitles 'subtitles': [ 'webvtt-py', - 'python-opensubtitles @ https://github.com/agonzalezro/python-opensubtitles/tarball/master'], + 'python-opensubtitles @ https://github.com/agonzalezro/python-opensubtitles/tarball/master', + ], # Support for mpv player plugin 'mpv': ['python-mpv'], # Support for NFC tags @@ -156,14 +165,21 @@ setup( # Support for Dropbox integration 'dropbox': ['dropbox'], # Support for Leap Motion backend - 'leap': ['leap-sdk @ https://github.com/BlackLight/leap-sdk-python3/tarball/master'], + 'leap': [ + 'leap-sdk @ https://github.com/BlackLight/leap-sdk-python3/tarball/master' + ], # Support for Flic buttons - 'flic': ['flic @ https://github.com/50ButtonsEach/fliclib-linux-hci/tarball/master'], + 'flic': [ + 'flic @ https://github.com/50ButtonsEach/fliclib-linux-hci/tarball/master' + ], # Support for Alexa/Echo plugin 'alexa': ['avs @ https://github.com/BlackLight/avs/tarball/master'], # Support for bluetooth devices - 'bluetooth': ['pybluez', 'gattlib', - 'pyobex @ https://github.com/BlackLight/PyOBEX/tarball/master'], + 'bluetooth': [ + 'pybluez', + 'gattlib', + 'pyobex @ https://github.com/BlackLight/PyOBEX/tarball/master', + ], # Support for TP-Link devices 'tplink': ['pyHS100'], # Support for PMW3901 2-Dimensional Optical Flow Sensor @@ -231,7 +247,9 @@ setup( # Support for Twilio integration 'twilio': ['twilio'], # Support for DHT11/DHT22/AM2302 temperature/humidity sensors - 'dht': ['Adafruit_Python_DHT @ git+https://github.com/adafruit/Adafruit_Python_DHT'], + 'dht': [ + 'Adafruit_Python_DHT @ git+https://github.com/adafruit/Adafruit_Python_DHT' + ], # Support for LCD display integration 'lcd': ['RPi.GPIO', 'RPLCD'], # Support for IMAP mail integration From 820a1c81846fc84ec572615fa2c2cd96f0bfb89d Mon Sep 17 00:00:00 2001 From: Fabio Manganiello <info@fabiomanganiello.com> Date: Wed, 27 Apr 2022 23:25:14 +0200 Subject: [PATCH 10/12] Don't raise a pytest warning upon the asyncio "No event loop" warning --- platypush/context/__init__.py | 25 ++++++++++++++----------- pyproject.toml | 4 ++++ 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/platypush/context/__init__.py b/platypush/context/__init__.py index bcadf289a..0d6876376 100644 --- a/platypush/context/__init__.py +++ b/platypush/context/__init__.py @@ -26,14 +26,14 @@ main_bus = None def register_backends(bus=None, global_scope=False, **kwargs): - """ Initialize the backend objects based on the configuration and returns + """Initialize the backend objects based on the configuration and returns a name -> backend_instance map. Params: bus -- If specific (it usually should), the messages processed by the backends will be posted on this bus. kwargs -- Any additional key-value parameters required to initialize the backends - """ + """ global main_bus if bus: @@ -57,8 +57,7 @@ def register_backends(bus=None, global_scope=False, **kwargs): b = getattr(module, cls_name)(bus=bus, **cfg, **kwargs) backends[name] = b except AttributeError as e: - logger.warning('No such class in {}: {}'.format( - module.__name__, cls_name)) + logger.warning('No such class in {}: {}'.format(module.__name__, cls_name)) raise RuntimeError(e) return backends @@ -74,15 +73,15 @@ def register_plugins(bus=None): def get_backend(name): - """ Returns the backend instance identified by name if it exists """ + """Returns the backend instance identified by name if it exists""" global backends return backends.get(name) def get_plugin(plugin_name, reload=False): - """ Registers a plugin instance by name if not registered already, or - returns the registered plugin instance""" + """Registers a plugin instance by name if not registered already, or + returns the registered plugin instance""" global plugins global plugins_init_locks @@ -104,8 +103,9 @@ def get_plugin(plugin_name, reload=False): cls_name += token.title() cls_name += 'Plugin' - plugin_conf = Config.get_plugins()[plugin_name] \ - if plugin_name in Config.get_plugins() else {} + plugin_conf = ( + Config.get_plugins()[plugin_name] if plugin_name in Config.get_plugins() else {} + ) if 'disabled' in plugin_conf: if plugin_conf['disabled'] is True: @@ -120,7 +120,9 @@ def get_plugin(plugin_name, reload=False): try: plugin_class = getattr(plugin, cls_name) except AttributeError as e: - logger.warning('No such class in {}: {} [error: {}]'.format(plugin_name, cls_name, str(e))) + logger.warning( + 'No such class in {}: {} [error: {}]'.format(plugin_name, cls_name, str(e)) + ) raise RuntimeError(e) with plugins_init_locks[plugin_name]: @@ -137,13 +139,14 @@ def get_bus() -> Bus: return main_bus from platypush.bus.redis import RedisBus + return RedisBus() def get_or_create_event_loop(): try: loop = asyncio.get_event_loop() - except RuntimeError: + except (DeprecationWarning, RuntimeError): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) diff --git a/pyproject.toml b/pyproject.toml index e7c6caf61..a5cd7464e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,3 +2,7 @@ skip-string-normalization = true skip-numeric-underscore-normalization = true +[tool.pytest.ini_options] +filterwarnings = [ + 'ignore:There is no current event loop:DeprecationWarning', +] From 41d0725ebf643c99e888fd112f4440e47b4c9ee7 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello <info@fabiomanganiello.com> Date: Thu, 28 Apr 2022 00:57:49 +0200 Subject: [PATCH 11/12] Fix for #217 The cron scheduler has been made more robust against changes in the system clock (caused by e.g. DST changes, NTP syncs or manual setting). A more granular management for cronjob events has been introduced, now supporting a `TIME_SYNC` event besides the usual `STOP`. When the cron scheduler detects a system clock drift (i.e. the timestamp offset before and after a blocking wait is >1 sec) then all the cronjobs are notified and forced to refresh their state. --- platypush/cron/scheduler.py | 100 ++++++++++++++++++++++++++------- tests/etc/scripts/test_cron.py | 41 ++++++++------ tests/test_cron.py | 70 ++++++++++++++--------- 3 files changed, 148 insertions(+), 63 deletions(-) diff --git a/platypush/cron/scheduler.py b/platypush/cron/scheduler.py index 735955bb4..74eaaae5e 100644 --- a/platypush/cron/scheduler.py +++ b/platypush/cron/scheduler.py @@ -7,11 +7,20 @@ import croniter from dateutil.tz import gettz from platypush.procedure import Procedure -from platypush.utils import is_functional_cron +from platypush.utils import is_functional_cron, set_thread_name logger = logging.getLogger('platypush:cron') +def get_now() -> datetime.datetime: + """ + :return: A timezone-aware representation of `now` + """ + return datetime.datetime.now().replace( + tzinfo=gettz() + ) # lgtm [py/call-to-non-callable] + + class CronjobState(enum.IntEnum): IDLE = 0 WAIT = 1 @@ -20,21 +29,36 @@ class CronjobState(enum.IntEnum): ERROR = 4 +class CronjobEvent(enum.IntEnum): + NONE = 0 + STOP = 1 + TIME_SYNC = 2 + + class Cronjob(threading.Thread): def __init__(self, name, cron_expression, actions): super().__init__() self.cron_expression = cron_expression self.name = name self.state = CronjobState.IDLE - self._should_stop = threading.Event() + self._event = threading.Event() + self._event_type = CronjobEvent.NONE + self._event_lock = threading.RLock() - if isinstance(actions, dict) or isinstance(actions, list): - self.actions = Procedure.build(name=name + '__Cron', _async=False, requests=actions) + if isinstance(actions, (list, dict)): + self.actions = Procedure.build( + name=name + '__Cron', _async=False, requests=actions + ) else: self.actions = actions + def notify(self, event: CronjobEvent): + with self._event_lock: + self._event_type = event + self._event.set() + def run(self): - self.state = CronjobState.WAIT + set_thread_name(f'cron:{self.name}') self.wait() if self.should_stop(): return @@ -57,26 +81,38 @@ class Cronjob(threading.Thread): self.state = CronjobState.ERROR def wait(self): - now = datetime.datetime.now().replace(tzinfo=gettz()) # lgtm [py/call-to-non-callable] - cron = croniter.croniter(self.cron_expression, now) - next_run = cron.get_next() - self._should_stop.wait(next_run - now.timestamp()) + with self._event_lock: + self.state = CronjobState.WAIT + self._event.clear() + self._event_type = CronjobEvent.TIME_SYNC + + while self._event_type == CronjobEvent.TIME_SYNC: + now = get_now() + self._event_type = CronjobEvent.NONE + cron = croniter.croniter(self.cron_expression, now) + next_run = cron.get_next() + self._event.wait(max(0, next_run - now.timestamp())) def stop(self): - self._should_stop.set() + self._event_type = CronjobEvent.STOP + self._event.set() def should_stop(self): - return self._should_stop.is_set() + return self._event_type == CronjobEvent.STOP class CronScheduler(threading.Thread): - def __init__(self, jobs): + def __init__(self, jobs, poll_seconds: float = 0.5): super().__init__() self.jobs_config = jobs self._jobs = {} + self._poll_seconds = max(1e-3, poll_seconds) self._should_stop = threading.Event() - logger.info('Cron scheduler initialized with {} jobs'. - format(len(self.jobs_config.keys()))) + logger.info( + 'Cron scheduler initialized with {} jobs'.format( + len(self.jobs_config.keys()) + ) + ) def _get_job(self, name, config): job = self._jobs.get(name) @@ -84,14 +120,21 @@ class CronScheduler(threading.Thread): return job if isinstance(config, dict): - self._jobs[name] = Cronjob(name=name, cron_expression=config['cron_expression'], - actions=config['actions']) + self._jobs[name] = Cronjob( + name=name, + cron_expression=config['cron_expression'], + actions=config['actions'], + ) elif is_functional_cron(config): - self._jobs[name] = Cronjob(name=name, cron_expression=config.cron_expression, - actions=config) + self._jobs[name] = Cronjob( + name=name, cron_expression=config.cron_expression, actions=config + ) else: - raise AssertionError('Expected type dict or function for cron {}, got {}'.format( - name, type(config))) + raise AssertionError( + 'Expected type dict or function for cron {}, got {}'.format( + name, type(config) + ) + ) return self._jobs[name] @@ -112,7 +155,22 @@ class CronScheduler(threading.Thread): if job.state == CronjobState.IDLE: job.start() - self._should_stop.wait(timeout=0.5) + t_before_wait = get_now().timestamp() + self._should_stop.wait(timeout=self._poll_seconds) + t_after_wait = get_now().timestamp() + time_drift = abs(t_after_wait - t_before_wait) - self._poll_seconds + + if not self.should_stop() and time_drift > 1: + # If the system clock has been adjusted by more than one second + # (e.g. because of DST change or NTP sync) then ensure that the + # registered cronjobs are synchronized with the new datetime + logger.info( + 'System clock drift detected: %f secs. Synchronizing the cronjobs', + time_drift, + ) + + for job in self._jobs.values(): + job.notify(CronjobEvent.TIME_SYNC) logger.info('Terminating cron scheduler') diff --git a/tests/etc/scripts/test_cron.py b/tests/etc/scripts/test_cron.py index 6618dcb3f..172dc363d 100644 --- a/tests/etc/scripts/test_cron.py +++ b/tests/etc/scripts/test_cron.py @@ -2,25 +2,34 @@ import datetime from platypush.cron import cron -from tests.test_cron import tmp_files, tmp_files_ready, \ - test_timeout, expected_cron_file_content +from tests.test_cron import test_timeout, cron_queue + + +def make_cron_expr(cron_time: datetime.datetime): + return '{min} {hour} {day} {month} * {sec}'.format( + min=cron_time.minute, + hour=cron_time.hour, + day=cron_time.day, + month=cron_time.month, + sec=cron_time.second, + ) + # Prepare a cronjob that should start test_timeout/2 seconds from the application start -cron_time = datetime.datetime.now() + datetime.timedelta(seconds=test_timeout/2) -cron_expr = '{min} {hour} {day} {month} * {sec}'.format( - min=cron_time.minute, hour=cron_time.hour, day=cron_time.day, - month=cron_time.month, sec=cron_time.second) +cron_time = datetime.datetime.now() + datetime.timedelta(seconds=test_timeout / 2) -@cron(cron_expr) +@cron(make_cron_expr(cron_time)) def cron_test(**_): - """ - Simple cronjob that awaits for ``../test_cron.py`` to be ready and writes the expected - content to the monitored temporary file. - """ - files_ready = tmp_files_ready.wait(timeout=test_timeout) - assert files_ready, \ - 'The test did not prepare the temporary files within {} seconds'.format(test_timeout) + cron_queue.put('cron_test') - with open(tmp_files[0], 'w') as f: - f.write(expected_cron_file_content) + +# Prepare another cronjob that should start 1hr + test_timeout/2 seconds from the application start +cron_time = datetime.datetime.now() + datetime.timedelta( + hours=1, seconds=test_timeout / 2 +) + + +@cron(make_cron_expr(cron_time)) +def cron_1hr_test(**_): + cron_queue.put('cron_1hr_test') diff --git a/tests/test_cron.py b/tests/test_cron.py index c7a76a873..5bc41e636 100644 --- a/tests/test_cron.py +++ b/tests/test_cron.py @@ -1,43 +1,61 @@ -import os +import datetime +import queue import pytest -import tempfile -import threading import time -tmp_files = [] -tmp_files_ready = threading.Event() +from dateutil.tz import gettz +from mock import patch + test_timeout = 10 -expected_cron_file_content = 'The cronjob ran successfully!' +cron_queue = queue.Queue() -@pytest.fixture(scope='module', autouse=True) -def tmp_file(*_): - tmp_file = tempfile.NamedTemporaryFile(prefix='platypush-test-cron-', - suffix='.txt', delete=False) - tmp_files.append(tmp_file.name) - tmp_files_ready.set() - yield tmp_file.name +class MockDatetime(datetime.datetime): + timedelta = datetime.timedelta() - for f in tmp_files: - if os.path.isfile(f): - os.unlink(f) + @classmethod + def now(cls): + return super().now(tz=gettz()) + cls.timedelta -def test_cron_execution(tmp_file): +def _test_cron_queue(expected_msg: str): + msg = None + test_start = time.time() + while time.time() - test_start <= test_timeout and msg != expected_msg: + try: + msg = cron_queue.get(block=True, timeout=test_timeout) + except queue.Empty: + break + + assert msg == expected_msg, 'The expected cronjob has not been executed' + + +def test_cron_execution(): """ Test that the cronjob in ``../etc/scripts/test_cron.py`` runs successfully. """ - actual_cron_file_content = None - test_start = time.time() + _test_cron_queue('cron_test') - while actual_cron_file_content != expected_cron_file_content and \ - time.time() - test_start < test_timeout: - with open(tmp_file, 'r') as f: - actual_cron_file_content = f.read() - time.sleep(0.5) - assert actual_cron_file_content == expected_cron_file_content, \ - 'cron_test failed to run within {} seconds'.format(test_timeout) +def test_cron_execution_upon_system_clock_change(): + """ + Test that the cronjob runs at the right time even upon DST or other + system clock changes. + """ + # Mock datetime.datetime with a class that has overridable timedelta + patcher = patch('datetime.datetime', MockDatetime) + + try: + patcher.start() + time.sleep(1) + # Simulate a +1hr shift on the system clock + MockDatetime.timedelta = datetime.timedelta(hours=1) + time.sleep(1) + finally: + patcher.stop() + + # Ensure that the cronjob that was supposed to run in an hour is now running + _test_cron_queue('cron_1hr_test') if __name__ == '__main__': From ba23eb72807625a13aa33e448ae2026e75917e3b Mon Sep 17 00:00:00 2001 From: Fabio Manganiello <info@fabiomanganiello.com> Date: Thu, 28 Apr 2022 01:04:30 +0200 Subject: [PATCH 12/12] Small LINT fix --- platypush/cron/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/platypush/cron/scheduler.py b/platypush/cron/scheduler.py index 74eaaae5e..0671cfd0d 100644 --- a/platypush/cron/scheduler.py +++ b/platypush/cron/scheduler.py @@ -17,8 +17,8 @@ def get_now() -> datetime.datetime: :return: A timezone-aware representation of `now` """ return datetime.datetime.now().replace( - tzinfo=gettz() - ) # lgtm [py/call-to-non-callable] + tzinfo=gettz() # lgtm [py/call-to-non-callable] + ) class CronjobState(enum.IntEnum):