diff --git a/docs/source/platypush/responses/pihole.rst b/docs/source/platypush/responses/pihole.rst deleted file mode 100644 index 0b94da43d..000000000 --- a/docs/source/platypush/responses/pihole.rst +++ /dev/null @@ -1,5 +0,0 @@ -``pihole`` -===================================== - -.. automodule:: platypush.message.response.pihole - :members: diff --git a/docs/source/responses.rst b/docs/source/responses.rst index 17b210add..925f56c82 100644 --- a/docs/source/responses.rst +++ b/docs/source/responses.rst @@ -7,7 +7,6 @@ Responses :caption: Responses: platypush/responses/google.drive.rst - platypush/responses/pihole.rst platypush/responses/printer.cups.rst platypush/responses/qrcode.rst platypush/responses/ssh.rst diff --git a/platypush/message/response/pihole.py b/platypush/message/response/pihole.py deleted file mode 100644 index 2ca2321e0..000000000 --- a/platypush/message/response/pihole.py +++ /dev/null @@ -1,38 +0,0 @@ -from platypush.message.response import Response - - -class PiholeStatusResponse(Response): - def __init__(self, - server: str, - status: str, - ads_percentage: float, - blocked: int, - cached: int, - domain_count: int, - forwarded: int, - queries: int, - total_clients: int, - total_queries: int, - unique_clients: int, - unique_domains: int, - version: str, - *args, - **kwargs): - super().__init__(*args, output={ - 'server': server, - 'status': status, - 'ads_percentage': ads_percentage, - 'blocked': blocked, - 'cached': cached, - 'domain_count': domain_count, - 'forwarded': forwarded, - 'queries': queries, - 'total_clients': total_clients, - 'total_queries': total_queries, - 'unique_clients': unique_clients, - 'unique_domains': unique_domains, - 'version': version, - }, **kwargs) - - -# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/pihole/__init__.py b/platypush/plugins/pihole/__init__.py index 45b088ebf..da0b880fd 100644 --- a/platypush/plugins/pihole/__init__.py +++ b/platypush/plugins/pihole/__init__.py @@ -1,25 +1,27 @@ import hashlib import requests -from enum import Enum -from typing import Optional, Union, List +from typing import Any, Dict, Optional, Union, List -from platypush.message.response.pihole import PiholeStatusResponse +from platypush.schemas.pihole import PiholeStatusSchema from platypush.plugins import Plugin, action -class PiholeStatus(Enum): - ENABLED = 'enabled' - DISABLED = 'disabled' - - class PiholePlugin(Plugin): """ Plugin for interacting with a `Pi-Hole `_ DNS server for advertisement and content blocking. """ - def __init__(self, server: Optional[str] = None, password: Optional[str] = None, api_key: Optional[str] = None, - ssl: bool = False, verify_ssl: bool = True, **kwargs): + def __init__( + self, + server: Optional[str] = None, + password: Optional[str] = None, + api_key: Optional[str] = None, + ssl: bool = False, + verify_ssl: bool = True, + timeout: int = 10, + **kwargs, + ): """ :param server: Default Pi-hole server IP address. :param password: Password for the default Pi-hole server. @@ -27,6 +29,7 @@ class PiholePlugin(Plugin): http://pi-hole-server/admin/scripts/pi-hole/php/api_token.php :param ssl: Set to true if the host uses HTTPS (default: False). :param verify_ssl: Set to False to disable SSL certificate check. + :param timeout: Default timeout for the HTTP requests. """ super().__init__(**kwargs) self.server = server @@ -34,23 +37,36 @@ class PiholePlugin(Plugin): self.api_key = api_key self.ssl = ssl self.verify_ssl = verify_ssl + self.timeout = timeout @staticmethod - def _get_token(password: Optional[str] = None, api_key: Optional[str] = None) -> str: + def _get_token( + password: Optional[str] = None, api_key: Optional[str] = None + ) -> str: if not password: return api_key or '' - return hashlib.sha256(hashlib.sha256(str(password).encode()).hexdigest().encode()).hexdigest() # lgtm [py/weak-sensitive-data-hashing] + return hashlib.sha256( + hashlib.sha256(str(password).encode()).hexdigest().encode() + ).hexdigest() # lgtm [py/weak-sensitive-data-hashing] - def _get_url(self, name: str, server: Optional[str] = None, password: Optional[str] = None, - ssl: Optional[bool] = None, api_key: Optional[str] = None, **kwargs) -> str: + def _get_url( + self, + name: str, + server: Optional[str] = None, + password: Optional[str] = None, + ssl: Optional[bool] = None, + api_key: Optional[str] = None, + **kwargs, + ) -> str: if not server: server = self.server password = password or self.password api_key = api_key or self.api_key ssl = ssl if ssl is not None else self.ssl - args = '&'.join(['{key}={value}'.format(key=key, value=value) for key, value in kwargs.items() - if value is not None]) + args = '&'.join( + [f'{key}={value}' for key, value in kwargs.items() if value is not None] + ) if args: args = '&' + args @@ -59,17 +75,20 @@ class PiholePlugin(Plugin): if token: token = '&auth=' + token - return 'http{ssl}://{server}/admin/api.php?{name}{token}{args}'.format( - ssl='s' if ssl else '', server=server, name=name, token=token, args=args) + return f'http{"s" if ssl else ""}://{server}/admin/api.php?{name}{token}{args}' @staticmethod def _normalize_number(n: Union[str, int]): return int(str(n).replace(',', '')) @action - def status(self, server: Optional[str] = None, password: Optional[str] = None, api_key: Optional[str] = None, - ssl: bool = None) \ - -> PiholeStatusResponse: + def status( + self, + server: Optional[str] = None, + password: Optional[str] = None, + api_key: Optional[str] = None, + ssl: Optional[bool] = None, + ) -> dict: """ Get the status and statistics of a running Pi-hole server. @@ -77,32 +96,64 @@ class PiholePlugin(Plugin): :param password: Server password (default: default configured ``password`` value). :param api_key: Server API key (default: default configured ``api_key`` value). :param ssl: Set to True if the server uses SSL (default: False). - :return: :class:`platypush.message.response.pihole.PiholeStatusResponse` + :return: .. schema:: pihole.PiholeStatusSchema """ - status = requests.get(self._get_url('summary', server=server, password=password, api_key=api_key, - ssl=ssl), verify=self.verify_ssl).json() - version = requests.get(self._get_url('versions', server=server, password=password, api_key=api_key, - ssl=ssl), verify=self.verify_ssl).json() + status = requests.get( + self._get_url( + 'summary', server=server, password=password, api_key=api_key, ssl=ssl + ), + verify=self.verify_ssl, + timeout=self.timeout, + ).json() - return PiholeStatusResponse( - server=server or self.server, - status=PiholeStatus(status.get('status')).value, - ads_percentage=float(status.get('ads_percentage_today')), - blocked=self._normalize_number(status.get('ads_blocked_today')), - cached=self._normalize_number(status.get('queries_cached')), - domain_count=self._normalize_number(status.get('domains_being_blocked')), - forwarded=self._normalize_number(status.get('queries_forwarded')), - queries=self._normalize_number(status.get('dns_queries_today')), - total_clients=self._normalize_number(status.get('clients_ever_seen')), - total_queries=self._normalize_number(status.get('dns_queries_all_types')), - unique_clients=self._normalize_number(status.get('unique_clients')), - unique_domains=self._normalize_number(status.get('unique_domains')), - version=version.get('core_current'), + version = requests.get( + self._get_url( + 'versions', server=server, password=password, api_key=api_key, ssl=ssl + ), + verify=self.verify_ssl, + timeout=self.timeout, + ).json() + + return dict( + PiholeStatusSchema().dump( + { + 'server': server or self.server, + 'status': status.get('status'), + 'ads_percentage': float(status.get('ads_percentage_today')), + 'blocked': self._normalize_number(status.get('ads_blocked_today')), + 'cached': self._normalize_number(status.get('queries_cached')), + 'domain_count': self._normalize_number( + status.get('domains_being_blocked') + ), + 'forwarded': self._normalize_number( + status.get('queries_forwarded') + ), + 'queries': self._normalize_number(status.get('dns_queries_today')), + 'total_clients': self._normalize_number( + status.get('clients_ever_seen') + ), + 'total_queries': self._normalize_number( + status.get('dns_queries_all_types') + ), + 'unique_clients': self._normalize_number( + status.get('unique_clients') + ), + 'unique_domains': self._normalize_number( + status.get('unique_domains') + ), + 'version': version.get('core_current'), + } + ) ) @action - def enable(self, server: Optional[str] = None, password: Optional[str] = None, api_key: Optional[str] = None, - ssl: bool = None): + def enable( + self, + server: Optional[str] = None, + password: Optional[str] = None, + api_key: Optional[str] = None, + ssl: Optional[bool] = None, + ): """ Enable a Pi-hole server. @@ -111,20 +162,31 @@ class PiholePlugin(Plugin): :param api_key: Server API key (default: default configured ``api_key`` value). :param ssl: Set to True if the server uses SSL (default: False). """ - response = requests.get(self._get_url('enable', server=server, password=password, api_key=api_key, ssl=ssl), - verify=self.verify_ssl) + response = requests.get( + self._get_url( + 'enable', server=server, password=password, api_key=api_key, ssl=ssl + ), + verify=self.verify_ssl, + timeout=self.timeout, + ) try: status = (response.json() or {}).get('status') assert status == 'enabled', 'Wrong credentials' except Exception as e: - raise AssertionError('Could not enable the server: {}'.format(response.text or str(e))) + raise AssertionError(f'Could not enable the server: {response.text or e}') return response.json() @action - def disable(self, server: Optional[str] = None, password: Optional[str] = None, api_key: Optional[str] = None, - seconds: Optional[int] = None, ssl: bool = None): + def disable( + self, + server: Optional[str] = None, + password: Optional[str] = None, + api_key: Optional[str] = None, + seconds: Optional[int] = None, + ssl: Optional[bool] = None, + ): """ Disable a Pi-hole server. @@ -135,46 +197,79 @@ class PiholePlugin(Plugin): :param ssl: Set to True if the server uses SSL (default: False). """ if seconds: - response = requests.get(self._get_url('', server=server, password=password, api_key=api_key, - ssl=ssl, disable=seconds), verify=self.verify_ssl) + response = requests.get( + self._get_url( + '', + server=server, + password=password, + api_key=api_key, + ssl=ssl, + disable=seconds, + ), + verify=self.verify_ssl, + timeout=self.timeout, + ) else: - response = requests.get(self._get_url('disable', server=server, password=password, api_key=api_key, - ssl=ssl), verify=self.verify_ssl) + response = requests.get( + self._get_url( + 'disable', + server=server, + password=password, + api_key=api_key, + ssl=ssl, + ), + verify=self.verify_ssl, + timeout=self.timeout, + ) try: status = (response.json() or {}).get('status') assert status == 'disabled', 'Wrong credentials' except Exception as e: - raise AssertionError('Could not disable the server: {}'.format(response.text or str(e))) + raise AssertionError(f'Could not disable the server: {response.text or e}') return response.json() - def _list_manage(self, domain: str, list_name: str, endpoint: str, server: Optional[str] = None, - password: Optional[str] = None, api_key: Optional[str] = None, ssl: bool = None): - data = { - 'list': list_name, - 'domain': domain - } + def _list_manage( + self, + domain: str, + list_name: str, + endpoint: str, + server: Optional[str] = None, + password: Optional[str] = None, + api_key: Optional[str] = None, + ssl: Optional[bool] = None, + ): + data: Dict[str, Any] = {'list': list_name, 'domain': domain} if password or self.password: data['pw'] = password or self.password elif api_key or self.api_key: data['auth'] = api_key or self.api_key - base_url = "http{ssl}://{host}/admin/scripts/pi-hole/php/{endpoint}.php".format( - ssl='s' if ssl or self.ssl else '', host=server or self.server, endpoint=endpoint + base_url = ( + f"http{'s' if ssl or (ssl is None and self.ssl) else ''}://" + f"{server or self.server}/admin/scripts/pi-hole/php/{endpoint}.php" ) with requests.session() as s: s.get(base_url, verify=self.verify_ssl) - response = requests.post(base_url, data=data, verify=self.verify_ssl).text.strip() + response = requests.post( + base_url, data=data, verify=self.verify_ssl, timeout=self.timeout + ).text.strip() return {'response': response} - def _list_get(self, list_name: str, server: Optional[str] = None, ssl: bool = None) -> List[str]: - response = requests.get("http{ssl}://{host}/admin/scripts/pi-hole/php/get.php?list={list}".format( - ssl='s' if ssl or self.ssl else '', host=server or self.server, list=list_name - ), verify=self.verify_ssl).json() + def _list_get( + self, list_name: str, server: Optional[str] = None, ssl: Optional[bool] = None + ) -> List[str]: + response = requests.get( + f"http{'s' if ssl or (ssl is None and self.ssl) else ''}" + f"://{server or self.server}/admin/scripts/pi-hole/php/" + f"get.php?list={list_name}", + verify=self.verify_ssl, + timeout=10, + ).json() ret = set() for ll in response: @@ -182,7 +277,9 @@ class PiholePlugin(Plugin): return list(ret) @action - def get_blacklist(self, server: Optional[str] = None, ssl: bool = None) -> List[str]: + def get_blacklist( + self, server: Optional[str] = None, ssl: Optional[bool] = None + ) -> List[str]: """ Get the content of the blacklist. @@ -192,7 +289,9 @@ class PiholePlugin(Plugin): return self._list_get(list_name='black', server=server, ssl=ssl) @action - def get_whitelist(self, server: Optional[str] = None, ssl: bool = None) -> List[str]: + def get_whitelist( + self, server: Optional[str] = None, ssl: Optional[bool] = None + ) -> List[str]: """ Get the content of the whitelist. @@ -202,7 +301,9 @@ class PiholePlugin(Plugin): return self._list_get(list_name='white', server=server, ssl=ssl) @action - def get_list(self, list_name: str, server: Optional[str] = None, ssl: bool = None) -> List[str]: + def get_list( + self, list_name: str, server: Optional[str] = None, ssl: Optional[bool] = None + ) -> List[str]: """ Get the content of a list stored on the server. @@ -213,8 +314,14 @@ class PiholePlugin(Plugin): return self._list_get(list_name=list_name, server=server, ssl=ssl) @action - def blacklist_add(self, domain: str, server: Optional[str] = None, password: Optional[str] = None, - api_key: Optional[str] = None, ssl: bool = None): + def blacklist_add( + self, + domain: str, + server: Optional[str] = None, + password: Optional[str] = None, + api_key: Optional[str] = None, + ssl: Optional[bool] = None, + ): """ Add a domain to the blacklist. @@ -224,12 +331,25 @@ class PiholePlugin(Plugin): :param api_key: Server API key (default: default configured ``api_key`` value). :param ssl: Set to True if the server uses SSL (default: False). """ - return self._list_manage(domain=domain, list_name='black', endpoint='add', server=server, password=password, - api_key=api_key, ssl=ssl) + return self._list_manage( + domain=domain, + list_name='black', + endpoint='add', + server=server, + password=password, + api_key=api_key, + ssl=ssl, + ) @action - def blacklist_remove(self, domain: str, server: Optional[str] = None, password: Optional[str] = None, - api_key: Optional[str] = None, ssl: bool = None): + def blacklist_remove( + self, + domain: str, + server: Optional[str] = None, + password: Optional[str] = None, + api_key: Optional[str] = None, + ssl: Optional[bool] = None, + ): """ Remove a domain from the blacklist. @@ -239,12 +359,25 @@ class PiholePlugin(Plugin): :param api_key: Server API key (default: default configured ``api_key`` value). :param ssl: Set to True if the server uses SSL (default: False). """ - return self._list_manage(domain=domain, list_name='black', endpoint='sub', server=server, password=password, - api_key=api_key, ssl=ssl) + return self._list_manage( + domain=domain, + list_name='black', + endpoint='sub', + server=server, + password=password, + api_key=api_key, + ssl=ssl, + ) @action - def whitelist_add(self, domain: str, server: Optional[str] = None, password: Optional[str] = None, - api_key: Optional[str] = None, ssl: bool = None): + def whitelist_add( + self, + domain: str, + server: Optional[str] = None, + password: Optional[str] = None, + api_key: Optional[str] = None, + ssl: Optional[bool] = None, + ): """ Add a domain to the whitelist. @@ -254,12 +387,25 @@ class PiholePlugin(Plugin): :param api_key: Server API key (default: default configured ``api_key`` value). :param ssl: Set to True if the server uses SSL (default: False). """ - return self._list_manage(domain=domain, list_name='white', endpoint='add', server=server, password=password, - api_key=api_key, ssl=ssl) + return self._list_manage( + domain=domain, + list_name='white', + endpoint='add', + server=server, + password=password, + api_key=api_key, + ssl=ssl, + ) @action - def whitelist_remove(self, domain: str, server: Optional[str] = None, password: Optional[str] = None, - api_key: Optional[str] = None, ssl: bool = None): + def whitelist_remove( + self, + domain: str, + server: Optional[str] = None, + password: Optional[str] = None, + api_key: Optional[str] = None, + ssl: Optional[bool] = None, + ): """ Remove a domain from the whitelist. @@ -269,12 +415,26 @@ class PiholePlugin(Plugin): :param api_key: Server API key (default: default configured ``api_key`` value). :param ssl: Set to True if the server uses SSL (default: False). """ - return self._list_manage(domain=domain, list_name='white', endpoint='sub', server=server, password=password, - api_key=api_key, ssl=ssl) + return self._list_manage( + domain=domain, + list_name='white', + endpoint='sub', + server=server, + password=password, + api_key=api_key, + ssl=ssl, + ) @action - def list_add(self, list_name: str, domain: str, server: Optional[str] = None, password: Optional[str] = None, - api_key: Optional[str] = None, ssl: bool = None): + def list_add( + self, + list_name: str, + domain: str, + server: Optional[str] = None, + password: Optional[str] = None, + api_key: Optional[str] = None, + ssl: Optional[bool] = None, + ): """ Add a domain to a custom list stored on the server. @@ -285,12 +445,26 @@ class PiholePlugin(Plugin): :param api_key: Server API key (default: default configured ``api_key`` value). :param ssl: Set to True if the server uses SSL (default: False). """ - return self._list_manage(domain=domain, list_name=list_name, endpoint='add', server=server, password=password, - api_key=api_key, ssl=ssl) + return self._list_manage( + domain=domain, + list_name=list_name, + endpoint='add', + server=server, + password=password, + api_key=api_key, + ssl=ssl, + ) @action - def list_remove(self, list_name: str, domain: str, server: Optional[str] = None, password: Optional[str] = None, - api_key: Optional[str] = None, ssl: bool = None): + def list_remove( + self, + list_name: str, + domain: str, + server: Optional[str] = None, + password: Optional[str] = None, + api_key: Optional[str] = None, + ssl: Optional[bool] = None, + ): """ Remove a domain from a custom list stored on the server. @@ -301,8 +475,15 @@ class PiholePlugin(Plugin): :param api_key: Server API key (default: default configured ``api_key`` value). :param ssl: Set to True if the server uses SSL (default: False). """ - return self._list_manage(domain=domain, list_name=list_name, endpoint='sub', server=server, password=password, - api_key=api_key, ssl=ssl) + return self._list_manage( + domain=domain, + list_name=list_name, + endpoint='sub', + server=server, + password=password, + api_key=api_key, + ssl=ssl, + ) # vim:sw=4:ts=4:et: diff --git a/platypush/schemas/pihole.py b/platypush/schemas/pihole.py new file mode 100644 index 000000000..6b152c915 --- /dev/null +++ b/platypush/schemas/pihole.py @@ -0,0 +1,140 @@ +from marshmallow import EXCLUDE, fields +from marshmallow.schema import Schema +from marshmallow.validate import OneOf + +from platypush.schemas import StrippedString + + +class PiholeStatusSchema(Schema): + """ + Schema for a Pi-hole status response. + + + "output": { + "server": "dns.fabiomanganiello.com", + "status": "enabled", + "ads_percentage": 6.7, + "blocked": 37191, + "cached": 361426, + "domain_count": 1656690, + "forwarded": 150187, + "queries": 552076, + "total_clients": 57, + "total_queries": 552076, + "unique_clients": 41, + "unique_domains": 39348, + "version": "5.18.2" + }, + """ + + class Meta: # type: ignore + """ + Exclude unknown fields. + """ + + unknown = EXCLUDE + + server = StrippedString( + required=True, + metadata={ + 'description': 'Hostname or IP of the Pi-hole server', + 'example': '192.168.1.254', + }, + ) + + status = fields.String( + required=True, + validate=OneOf(['enabled', 'disabled']), + metadata={ + 'description': 'Status of the Pi-hole server', + 'example': 'enabled', + }, + ) + + ads_percentage = fields.Float( + required=True, + metadata={ + 'description': 'Percentage of ads blocked by the Pi-hole server', + 'example': 6.7, + }, + ) + + blocked = fields.Integer( + required=True, + metadata={ + 'description': 'Number of blocked queries', + 'example': 37191, + }, + ) + + cached = fields.Integer( + required=True, + metadata={ + 'description': 'Number of cached queries', + 'example': 361426, + }, + ) + + domain_count = fields.Integer( + required=True, + metadata={ + 'description': 'Number of domains resolved the Pi-hole server', + 'example': 1656690, + }, + ) + + forwarded = fields.Integer( + required=True, + metadata={ + 'description': 'Number of forwarded queries', + 'example': 150187, + }, + ) + + queries = fields.Integer( + required=True, + metadata={ + 'description': 'Number of processed queries since the latest restart', + 'example': 552076, + }, + ) + + total_clients = fields.Integer( + required=True, + metadata={ + 'description': 'Number of connected clients', + 'example': 57, + }, + ) + + total_queries = fields.Integer( + required=True, + metadata={ + 'description': 'Total number of queries processed by the Pi-hole server', + 'example': 552076, + }, + ) + + unique_clients = fields.Integer( + required=True, + metadata={ + 'description': 'Number of unique IP addresses connected to the Pi-hole server', + 'example': 41, + }, + ) + + unique_domains = fields.Integer( + required=True, + metadata={ + 'description': 'Number of unique domains resolved by the Pi-hole server', + 'example': 39348, + }, + ) + + version = StrippedString( + required=True, + metadata={ + 'description': 'Version of the Pi-hole server', + 'example': '5.18.2', + }, + )