platypush/platypush/plugins/pihole/__init__.py

490 lines
16 KiB
Python

import hashlib
import requests
from typing import Any, Dict, Optional, Union, List
from platypush.schemas.pihole import PiholeStatusSchema
from platypush.plugins import Plugin, action
class PiholePlugin(Plugin):
"""
Plugin for interacting with a `Pi-Hole <https://pi-hole.net>`_ DNS server for advertisement and content blocking.
"""
def __init__(
self,
server: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
ssl: bool = False,
verify_ssl: bool = True,
timeout: int = 10,
**kwargs,
):
"""
:param server: Default Pi-hole server IP address.
:param password: Password for the default Pi-hole server.
:param api_key: Alternatively to the password, you can also provide the server ``api_key``, as retrieved from
http://pi-hole-server/admin/scripts/pi-hole/php/api_token.php
:param ssl: Set to true if the host uses HTTPS (default: False).
:param verify_ssl: Set to False to disable SSL certificate check.
:param timeout: Default timeout for the HTTP requests.
"""
super().__init__(**kwargs)
self.server = server
self.password = password
self.api_key = api_key
self.ssl = ssl
self.verify_ssl = verify_ssl
self.timeout = timeout
@staticmethod
def _get_token(
password: Optional[str] = None, api_key: Optional[str] = None
) -> str:
if not password:
return api_key or ''
return hashlib.sha256(
hashlib.sha256(str(password).encode()).hexdigest().encode()
).hexdigest() # lgtm [py/weak-sensitive-data-hashing]
def _get_url(
self,
name: str,
server: Optional[str] = None,
password: Optional[str] = None,
ssl: Optional[bool] = None,
api_key: Optional[str] = None,
**kwargs,
) -> str:
if not server:
server = self.server
password = password or self.password
api_key = api_key or self.api_key
ssl = ssl if ssl is not None else self.ssl
args = '&'.join(
[f'{key}={value}' for key, value in kwargs.items() if value is not None]
)
if args:
args = '&' + args
token = self._get_token(password=password, api_key=api_key)
if token:
token = '&auth=' + token
return f'http{"s" if ssl else ""}://{server}/admin/api.php?{name}{token}{args}'
@staticmethod
def _normalize_number(n: Union[str, int]):
return int(str(n).replace(',', ''))
@action
def status(
self,
server: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
ssl: Optional[bool] = None,
) -> dict:
"""
Get the status and statistics of a running Pi-hole server.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
:return: .. schema:: pihole.PiholeStatusSchema
"""
status = requests.get(
self._get_url(
'summary', server=server, password=password, api_key=api_key, ssl=ssl
),
verify=self.verify_ssl,
timeout=self.timeout,
).json()
version = requests.get(
self._get_url(
'versions', server=server, password=password, api_key=api_key, ssl=ssl
),
verify=self.verify_ssl,
timeout=self.timeout,
).json()
return dict(
PiholeStatusSchema().dump(
{
'server': server or self.server,
'status': status.get('status'),
'ads_percentage': float(status.get('ads_percentage_today')),
'blocked': self._normalize_number(status.get('ads_blocked_today')),
'cached': self._normalize_number(status.get('queries_cached')),
'domain_count': self._normalize_number(
status.get('domains_being_blocked')
),
'forwarded': self._normalize_number(
status.get('queries_forwarded')
),
'queries': self._normalize_number(status.get('dns_queries_today')),
'total_clients': self._normalize_number(
status.get('clients_ever_seen')
),
'total_queries': self._normalize_number(
status.get('dns_queries_all_types')
),
'unique_clients': self._normalize_number(
status.get('unique_clients')
),
'unique_domains': self._normalize_number(
status.get('unique_domains')
),
'version': version.get('core_current'),
}
)
)
@action
def enable(
self,
server: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
ssl: Optional[bool] = None,
):
"""
Enable a Pi-hole server.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
response = requests.get(
self._get_url(
'enable', server=server, password=password, api_key=api_key, ssl=ssl
),
verify=self.verify_ssl,
timeout=self.timeout,
)
try:
status = (response.json() or {}).get('status')
assert status == 'enabled', 'Wrong credentials'
except Exception as e:
raise AssertionError(f'Could not enable the server: {response.text or e}')
return response.json()
@action
def disable(
self,
server: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
seconds: Optional[int] = None,
ssl: Optional[bool] = None,
):
"""
Disable a Pi-hole server.
:param seconds: How long the server will be disabled in seconds (default: None, indefinitely).
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
if seconds:
response = requests.get(
self._get_url(
'',
server=server,
password=password,
api_key=api_key,
ssl=ssl,
disable=seconds,
),
verify=self.verify_ssl,
timeout=self.timeout,
)
else:
response = requests.get(
self._get_url(
'disable',
server=server,
password=password,
api_key=api_key,
ssl=ssl,
),
verify=self.verify_ssl,
timeout=self.timeout,
)
try:
status = (response.json() or {}).get('status')
assert status == 'disabled', 'Wrong credentials'
except Exception as e:
raise AssertionError(f'Could not disable the server: {response.text or e}')
return response.json()
def _list_manage(
self,
domain: str,
list_name: str,
endpoint: str,
server: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
ssl: Optional[bool] = None,
):
data: Dict[str, Any] = {'list': list_name, 'domain': domain}
if password or self.password:
data['pw'] = password or self.password
elif api_key or self.api_key:
data['auth'] = api_key or self.api_key
base_url = (
f"http{'s' if ssl or (ssl is None and self.ssl) else ''}://"
f"{server or self.server}/admin/scripts/pi-hole/php/{endpoint}.php"
)
with requests.session() as s:
s.get(base_url, verify=self.verify_ssl)
response = requests.post(
base_url, data=data, verify=self.verify_ssl, timeout=self.timeout
).text.strip()
return {'response': response}
def _list_get(
self, list_name: str, server: Optional[str] = None, ssl: Optional[bool] = None
) -> List[str]:
response = requests.get(
f"http{'s' if ssl or (ssl is None and self.ssl) else ''}"
f"://{server or self.server}/admin/scripts/pi-hole/php/"
f"get.php?list={list_name}",
verify=self.verify_ssl,
timeout=10,
).json()
ret = set()
for ll in response:
ret.update(ll)
return list(ret)
@action
def get_blacklist(
self, server: Optional[str] = None, ssl: Optional[bool] = None
) -> List[str]:
"""
Get the content of the blacklist.
:param server: Server IP address (default: default configured ``server`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_get(list_name='black', server=server, ssl=ssl)
@action
def get_whitelist(
self, server: Optional[str] = None, ssl: Optional[bool] = None
) -> List[str]:
"""
Get the content of the whitelist.
:param server: Server IP address (default: default configured ``server`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_get(list_name='white', server=server, ssl=ssl)
@action
def get_list(
self, list_name: str, server: Optional[str] = None, ssl: Optional[bool] = None
) -> List[str]:
"""
Get the content of a list stored on the server.
:param list_name: List name
:param server: Server IP address (default: default configured ``server`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_get(list_name=list_name, server=server, ssl=ssl)
@action
def blacklist_add(
self,
domain: str,
server: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
ssl: Optional[bool] = None,
):
"""
Add a domain to the blacklist.
:param domain: Domain name.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_manage(
domain=domain,
list_name='black',
endpoint='add',
server=server,
password=password,
api_key=api_key,
ssl=ssl,
)
@action
def blacklist_remove(
self,
domain: str,
server: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
ssl: Optional[bool] = None,
):
"""
Remove a domain from the blacklist.
:param domain: Domain name.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_manage(
domain=domain,
list_name='black',
endpoint='sub',
server=server,
password=password,
api_key=api_key,
ssl=ssl,
)
@action
def whitelist_add(
self,
domain: str,
server: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
ssl: Optional[bool] = None,
):
"""
Add a domain to the whitelist.
:param domain: Domain name.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_manage(
domain=domain,
list_name='white',
endpoint='add',
server=server,
password=password,
api_key=api_key,
ssl=ssl,
)
@action
def whitelist_remove(
self,
domain: str,
server: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
ssl: Optional[bool] = None,
):
"""
Remove a domain from the whitelist.
:param domain: Domain name.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_manage(
domain=domain,
list_name='white',
endpoint='sub',
server=server,
password=password,
api_key=api_key,
ssl=ssl,
)
@action
def list_add(
self,
list_name: str,
domain: str,
server: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
ssl: Optional[bool] = None,
):
"""
Add a domain to a custom list stored on the server.
:param list_name: List name
:param domain: Domain name.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_manage(
domain=domain,
list_name=list_name,
endpoint='add',
server=server,
password=password,
api_key=api_key,
ssl=ssl,
)
@action
def list_remove(
self,
list_name: str,
domain: str,
server: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
ssl: Optional[bool] = None,
):
"""
Remove a domain from a custom list stored on the server.
:param list_name: List name
:param domain: Domain name.
:param server: Server IP address (default: default configured ``server`` value).
:param password: Server password (default: default configured ``password`` value).
:param api_key: Server API key (default: default configured ``api_key`` value).
:param ssl: Set to True if the server uses SSL (default: False).
"""
return self._list_manage(
domain=domain,
list_name=list_name,
endpoint='sub',
server=server,
password=password,
api_key=api_key,
ssl=ssl,
)
# vim:sw=4:ts=4:et: