Added Pi-hole integration - closes #100

This commit is contained in:
Fabio Manganiello 2020-01-06 19:22:04 +01:00
parent e880f00498
commit 3271759fba
5 changed files with 364 additions and 9 deletions

View file

@ -0,0 +1,5 @@
``platypush.plugins.pihole``
============================
.. automodule:: platypush.plugins.pihole
:members:

View file

@ -76,6 +76,7 @@ Plugins
platypush/plugins/music.rst
platypush/plugins/music.mpd.rst
platypush/plugins/music.snapcast.rst
platypush/plugins/pihole.rst
platypush/plugins/ping.rst
platypush/plugins/printer.cups.rst
platypush/plugins/pushbullet.rst

View file

@ -0,0 +1,38 @@
from platypush.message.response import Response
class PiholeStatusResponse(Response):
def __init__(self,
server: str,
status: str,
ads_percentage: float,
blocked: int,
cached: int,
domain_count: int,
forwarded: int,
queries: int,
total_clients: int,
total_queries: int,
unique_clients: int,
unique_domains: int,
version: str,
*args,
**kwargs):
super().__init__(*args, output={
'server': server,
'status': status,
'ads_percentage': ads_percentage,
'blocked': blocked,
'cached': cached,
'domain_count': domain_count,
'forwarded': forwarded,
'queries': queries,
'total_clients': total_clients,
'total_queries': total_queries,
'unique_clients': unique_clients,
'unique_domains': unique_domains,
'version': version,
}, **kwargs)
# vim:sw=4:ts=4:et:

View file

@ -8,14 +8,16 @@ class GraphitePlugin(Plugin):
Plugin for sending data to a Graphite instance.
"""
def __init__(self, host: str = 'localhost', port: int = 2003, **kwargs):
def __init__(self, host: str = 'localhost', port: int = 2003, timeout: int = 5, **kwargs):
"""
:param host: Default Graphite host (default: 'localhost').
:param port: Default Graphite port (default: 2003).
:param timeout: Communication timeout in seconds (default: 5).
"""
super().__init__(**kwargs)
self.host = host
self.port = port
self.timeout = timeout
@action
def send(self,
@ -23,6 +25,7 @@ class GraphitePlugin(Plugin):
value,
host: Optional[str] = None,
port: Optional[int] = None,
timeout: Optional[int] = None,
tags: Optional[Dict[str, str]] = None,
prefix: str = '',
protocol: str = 'tcp'):
@ -44,7 +47,7 @@ class GraphitePlugin(Plugin):
tags = tags or {}
protocol = protocol.lower()
graphyte.init(host, port=port, prefix=prefix, protocol=protocol)
graphyte.init(host, port=port, prefix=prefix, protocol=protocol, timeout=timeout or self.timeout)
graphyte.send(metric, value, tags=tags)

308
platypush/plugins/pihole.py Normal file
View file

@ -0,0 +1,308 @@
import hashlib
import requests
from enum import Enum
from typing import Optional, Union, List
from platypush.message.response.pihole import PiholeStatusResponse
from platypush.plugins import Plugin, action
class PiholeStatus(Enum):
ENABLED = 'enabled'
DISABLED = 'disabled'
class PiholePlugin(Plugin):
"""
Plugin for interacting with a `Pi-Hole <https://pi-hole.net>`_ DNS server for advertisement and content blocking.
"""
def __init__(self, server: Optional[str] = None, password: Optional[str] = None, api_key: Optional[str] = None,
ssl: bool = False, verify_ssl: bool = True, **kwargs):
"""
:param server: Default Pi-hole server IP address.
:param password: Password for the default Pi-hole server.
:param api_key: Alternatively to the password, you can also provide the server ``api_key``, as retrieved from
http://pi-hole-server/admin/scripts/pi-hole/php/api_token.php
:param ssl: Set to true if the host uses HTTPS (default: False).
:param verify_ssl: Set to False to disable SSL certificate check.
"""
super().__init__(**kwargs)
self.server = server
self.password = password
self.api_key = api_key
self.ssl = ssl
self.verify_ssl = verify_ssl
@staticmethod
def _get_token(password: Optional[str] = None, api_key: Optional[str] = None) -> str:
if not password:
return api_key or ''
return hashlib.sha256(hashlib.sha256(str(password).encode()).hexdigest().encode()).hexdigest()
def _get_url(self, name: str, server: Optional[str] = None, password: Optional[str] = None,
ssl: Optional[bool] = None, api_key: Optional[str] = None, **kwargs) -> str:
if not server:
server = self.server
password = password or self.password
api_key = api_key or self.api_key
ssl = ssl if ssl is not None else self.ssl
args = '&'.join(['{key}={value}'.format(key=key, value=value) for key, value in kwargs.items()
if value is not None])
if args:
args = '&' + args
token = self._get_token(password=password, api_key=api_key)
if token:
token = '&auth=' + token
return 'http{ssl}://{server}/admin/api.php?{name}{token}{args}'.format(
ssl='s' if ssl else '', server=server, name=name, token=token, args=args)
@staticmethod
def _normalize_number(n: Union[str, int]):
return int(str(n).replace(',', ''))
@action
def status(self, server: Optional[str] = None, password: Optional[str] = None, api_key: Optional[str] = None,
ssl: bool = None) \
-> PiholeStatusResponse:
"""
Get the status and statistics of a running Pi-hole server.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
:return: :class:`platypush.message.response.pihole.PiholeStatusResponse`
"""
status = requests.get(self._get_url('summary', server=server, password=password, api_key=api_key,
ssl=ssl), verify=self.verify_ssl).json()
version = requests.get(self._get_url('versions', server=server, password=password, api_key=api_key,
ssl=ssl), verify=self.verify_ssl).json()
return PiholeStatusResponse(
server=server or self.server,
status=PiholeStatus(status.get('status')).value,
ads_percentage=float(status.get('ads_percentage_today')),
blocked=self._normalize_number(status.get('ads_blocked_today')),
cached=self._normalize_number(status.get('queries_cached')),
domain_count=self._normalize_number(status.get('domains_being_blocked')),
forwarded=self._normalize_number(status.get('queries_forwarded')),
queries=self._normalize_number(status.get('dns_queries_today')),
total_clients=self._normalize_number(status.get('clients_ever_seen')),
total_queries=self._normalize_number(status.get('dns_queries_all_types')),
unique_clients=self._normalize_number(status.get('unique_clients')),
unique_domains=self._normalize_number(status.get('unique_domains')),
version=version.get('core_current'),
)
@action
def enable(self, server: Optional[str] = None, password: Optional[str] = None, api_key: Optional[str] = None,
ssl: bool = None):
"""
Enable a Pi-hole server.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
response = requests.get(self._get_url('enable', server=server, password=password, api_key=api_key, ssl=ssl),
verify=self.verify_ssl)
try:
status = (response.json() or {}).get('status')
except Exception as e:
raise AssertionError('Could not enable the server: {}'.format(response.text or str(e)))
assert status == 'enabled', 'Could not enable the server: Wrong credentials'
return response.json()
@action
def disable(self, server: Optional[str] = None, password: Optional[str] = None, api_key: Optional[str] = None,
seconds: Optional[int] = None, ssl: bool = None):
"""
Disable a Pi-hole server.
:param seconds: How long the server will be disabled in seconds (default: None, indefinitely).
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
if seconds:
response = requests.get(self._get_url('', server=server, password=password, api_key=api_key,
ssl=ssl, disable=seconds), verify=self.verify_ssl)
else:
response = requests.get(self._get_url('disable', server=server, password=password, api_key=api_key,
ssl=ssl), verify=self.verify_ssl)
try:
status = (response.json() or {}).get('status')
except Exception as e:
raise AssertionError('Could not disable the server: {}'.format(response.text or str(e)))
assert status == 'disabled', 'Could not disable the server: Wrong credentials'
return response.json()
def _list_manage(self, domain: str, list_name: str, endpoint: str, server: Optional[str] = None,
password: Optional[str] = None, api_key: Optional[str] = None, ssl: bool = None):
data = {
'list': list_name,
'domain': domain
}
if password or self.password:
data['pw'] = password or self.password
elif api_key or self.api_key:
data['auth'] = api_key or self.api_key
base_url = "http{ssl}://{host}/admin/scripts/pi-hole/php/{endpoint}.php".format(
ssl='s' if ssl or self.ssl else '', host=server or self.server, endpoint=endpoint
)
with requests.session() as s:
s.get(base_url, verify=self.verify_ssl)
response = requests.post(base_url, data=data, verify=self.verify_ssl).text.strip()
return {'response': response}
def _list_get(self, list_name: str, server: Optional[str] = None, ssl: bool = None) -> List[str]:
response = requests.get("http{ssl}://{host}/admin/scripts/pi-hole/php/get.php?list={list}".format(
ssl='s' if ssl or self.ssl else '', host=server or self.server, list=list_name
), verify=self.verify_ssl).json()
ret = set()
for ll in response:
ret.update(ll)
return list(ret)
@action
def get_blacklist(self, server: Optional[str] = None, ssl: bool = None) -> List[str]:
"""
Get the content of the blacklist.
:param server: Server IP address (default: default configured ``server`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_get(list_name='black', server=server, ssl=ssl)
@action
def get_whitelist(self, server: Optional[str] = None, ssl: bool = None) -> List[str]:
"""
Get the content of the whitelist.
:param server: Server IP address (default: default configured ``server`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_get(list_name='white', server=server, ssl=ssl)
@action
def get_list(self, list_name: str, server: Optional[str] = None, ssl: bool = None) -> List[str]:
"""
Get the content of a list stored on the server.
:param list_name: List name
:param server: Server IP address (default: default configured ``server`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_get(list_name=list_name, server=server, ssl=ssl)
@action
def blacklist_add(self, domain: str, server: Optional[str] = None, password: Optional[str] = None,
api_key: Optional[str] = None, ssl: bool = None):
"""
Add a domain to the blacklist.
:param domain: Domain name.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_manage(domain=domain, list_name='black', endpoint='add', server=server, password=password,
api_key=api_key, ssl=ssl)
@action
def blacklist_remove(self, domain: str, server: Optional[str] = None, password: Optional[str] = None,
api_key: Optional[str] = None, ssl: bool = None):
"""
Remove a domain from the blacklist.
:param domain: Domain name.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_manage(domain=domain, list_name='black', endpoint='sub', server=server, password=password,
api_key=api_key, ssl=ssl)
@action
def whitelist_add(self, domain: str, server: Optional[str] = None, password: Optional[str] = None,
api_key: Optional[str] = None, ssl: bool = None):
"""
Add a domain to the whitelist.
:param domain: Domain name.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_manage(domain=domain, list_name='white', endpoint='add', server=server, password=password,
api_key=api_key, ssl=ssl)
@action
def whitelist_remove(self, domain: str, server: Optional[str] = None, password: Optional[str] = None,
api_key: Optional[str] = None, ssl: bool = None):
"""
Remove a domain from the whitelist.
:param domain: Domain name.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_manage(domain=domain, list_name='white', endpoint='sub', server=server, password=password,
api_key=api_key, ssl=ssl)
@action
def list_add(self, list_name: str, domain: str, server: Optional[str] = None, password: Optional[str] = None,
api_key: Optional[str] = None, ssl: bool = None):
"""
Add a domain to a custom list stored on the server.
:param list_name: List name
:param domain: Domain name.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_manage(domain=domain, list_name=list_name, endpoint='add', server=server, password=password,
api_key=api_key, ssl=ssl)
@action
def list_remove(self, list_name: str, domain: str, server: Optional[str] = None, password: Optional[str] = None,
api_key: Optional[str] = None, ssl: bool = None):
"""
Remove a domain from a custom list stored on the server.
:param list_name: List name
:param domain: Domain name.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_manage(domain=domain, list_name=list_name, endpoint='sub', server=server, password=password,
api_key=api_key, ssl=ssl)
# vim:sw=4:ts=4:et: