Removed dependency from prctl.

Also, black'd and LINT-fixed some files that hadn't been touched in a
while.
This commit is contained in:
Fabio Manganiello 2023-07-23 19:04:01 +02:00
parent 04b759e4d5
commit 0dc380fa94
Signed by: blacklight
GPG key ID: D90FBA7F76362774
14 changed files with 270 additions and 199 deletions

View file

@ -23,7 +23,7 @@ from .message.event import Event
from .message.event.application import ApplicationStartedEvent
from .message.request import Request
from .message.response import Response
from .utils import set_thread_name, get_enabled_plugins
from .utils import get_enabled_plugins
__author__ = 'Fabio Manganiello <fabio@manganiello.tech>'
__version__ = '0.50.3'
@ -252,7 +252,6 @@ class Daemon:
if not self.no_capture_stderr:
sys.stderr = Logger(log.warning)
set_thread_name('platypush')
log.info('---- Starting platypush v.%s', __version__)
# Initialize the backends and link them to the bus

View file

@ -18,7 +18,6 @@ from platypush.utils import (
set_timeout,
clear_timeout,
get_redis_queue_name_by_message,
set_thread_name,
get_backend_name_by_class,
)
@ -81,7 +80,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
self._request_context = kwargs['_req_ctx'] if '_req_ctx' in kwargs else None
if 'logging' in kwargs:
self.logger.setLevel(getattr(logging, kwargs.get('logging').upper()))
self.logger.setLevel(getattr(logging, kwargs['logging'].upper()))
def on_message(self, msg):
"""
@ -95,21 +94,23 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
"""
msg = Message.build(msg)
if msg is None:
return
if not getattr(msg, 'target', None) or msg.target != self.device_id:
return # Not for me
self.logger.debug(
'Message received on the {} backend: {}'.format(
self.__class__.__name__, msg
)
'Message received on the %s backend: %s', self.__class__.__name__, msg
)
if self._is_expected_response(msg):
# Expected response, trigger the response handler
clear_timeout()
if self._request_context:
# pylint: disable=unsubscriptable-object
self._request_context['on_response'](msg)
self.stop()
return
@ -117,8 +118,10 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
self.bus.post(msg)
def _is_expected_response(self, msg):
"""Internal only - returns true if we are expecting for a response
and msg is that response"""
"""
Internal only - returns true if we are expecting for a response and msg
is that response.
"""
# pylint: disable=unsubscriptable-object
return (
@ -131,12 +134,12 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
config_name = (
'backend.' + self.__class__.__name__.split('Backend', maxsplit=1)[0].lower()
)
return Config.get(config_name)
return Config.get(config_name) or {}
def _setup_response_handler(self, request, on_response, response_timeout):
def _timeout_hndl():
raise RuntimeError(
'Timed out while waiting for a response from {}'.format(request.target)
f'Timed out while waiting for a response from {request.target}'
)
req_ctx = {
@ -177,7 +180,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
request,
on_response=None,
response_timeout=_default_response_timeout,
**kwargs
**kwargs,
):
"""
Send a request message on the backend.
@ -237,9 +240,10 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
except KeyError:
self.logger.warning(
(
"Backend {} does not implement send_message "
"Backend %s does not implement send_message "
"and the fallback Redis backend isn't configured"
).format(self.__class__.__name__)
),
self.__class__.__name__,
)
return
@ -248,7 +252,6 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
def run(self):
"""Starts the backend thread. To be implemented in the derived classes if the loop method isn't defined."""
self.thread_id = get_ident()
set_thread_name(self._thread_name)
if not callable(self.loop):
return
@ -272,21 +275,19 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
time.sleep(5)
except Exception as e:
self.logger.error(
'{} initialization error: {}'.format(
self.__class__.__name__, str(e)
)
'%s initialization error: %s', self.__class__.__name__, e
)
self.logger.exception(e)
time.sleep(self.poll_seconds or 5)
def __enter__(self):
"""Invoked when the backend is initialized, if the main logic is within a ``loop()`` function"""
self.logger.info('Initialized backend {}'.format(self.__class__.__name__))
self.logger.info('Initialized backend %s', self.__class__.__name__)
def __exit__(self, exc_type, exc_val, exc_tb):
def __exit__(self, *_, **__):
"""Invoked when the backend is terminated, if the main logic is within a ``loop()`` function"""
self.on_stop()
self.logger.info('Terminated backend {}'.format(self.__class__.__name__))
self.logger.info('Terminated backend %s', self.__class__.__name__)
def on_stop(self):
"""Callback invoked when the process stops"""
@ -324,9 +325,14 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
return redis
def get_message_response(self, msg):
queue = get_redis_queue_name_by_message(msg)
if not queue:
self.logger.warning('No response queue configured for the message')
return None
try:
redis = self._get_redis()
response = redis.blpop(get_redis_queue_name_by_message(msg), timeout=60)
response = redis.blpop(queue, timeout=60)
if response and len(response) > 1:
response = Message.build(response[1])
else:
@ -334,9 +340,9 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
return response
except Exception as e:
self.logger.error(
'Error while processing response to {}: {}'.format(msg, str(e))
)
self.logger.error('Error while processing response to %s: %s', msg, e)
return None
@staticmethod
def _get_ip() -> str:
@ -395,18 +401,9 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
}
name = name or re.sub(r'Backend$', '', self.__class__.__name__).lower()
srv_type = srv_type or '_platypush-{name}._{proto}.local.'.format(
name=name, proto='udp' if udp else 'tcp'
)
srv_name = srv_name or '{host}.{type}'.format(
host=self.device_id, type=srv_type
)
if port:
srv_port = port
else:
srv_port = self.port if hasattr(self, 'port') else None
srv_type = srv_type or f'_platypush-{name}._{"udp" if udp else "tcp"}.local.'
srv_name = srv_name or f'{self.device_id}.{srv_type}'
srv_port = port if port else getattr(self, 'port', None)
self.zeroconf_info = ServiceInfo(
srv_type,
srv_name,
@ -439,9 +436,10 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest):
self.zeroconf.unregister_service(self.zeroconf_info)
except Exception as e:
self.logger.warning(
'Could not register Zeroconf service {}: {}: {}'.format(
self.zeroconf_info.name, type(e).__name__, str(e)
)
'Could not register Zeroconf service %s: %s: %s',
self.zeroconf_info.name,
type(e).__name__,
e,
)
if self.zeroconf:

View file

@ -5,7 +5,6 @@ from typing import Optional, List
from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.message.event.google.pubsub import GooglePubsubMessageEvent
from platypush.utils import set_thread_name
class GooglePubsubBackend(Backend):
@ -25,7 +24,9 @@ class GooglePubsubBackend(Backend):
"""
def __init__(self, topics: List[str], credentials_file: Optional[str] = None, *args, **kwargs):
def __init__(
self, topics: List[str], credentials_file: Optional[str] = None, *args, **kwargs
):
"""
:param topics: List of topics to subscribe. You can either specify the full topic name in the format
``projects/<project_id>/topics/<topic_name>``, where ``<project_id>`` must be the ID of your
@ -35,7 +36,7 @@ class GooglePubsubBackend(Backend):
``google.pubsub`` plugin or ``~/.credentials/platypush/google/pubsub.json``).
"""
super().__init__(*args, **kwargs)
super().__init__(*args, name='GooglePubSub', **kwargs)
self.topics = topics
if credentials_file:
@ -46,7 +47,9 @@ class GooglePubsubBackend(Backend):
@staticmethod
def _get_plugin():
return get_plugin('google.pubsub')
plugin = get_plugin('google.pubsub')
assert plugin, 'google.pubsub plugin not enabled'
return plugin
def _message_callback(self, topic):
def callback(msg):
@ -54,7 +57,7 @@ class GooglePubsubBackend(Backend):
try:
data = json.loads(data)
except Exception as e:
self.logger.debug('Not a valid JSON: {}: {}'.format(data, str(e)))
self.logger.debug('Not a valid JSON: %s: %s', data, e)
msg.ack()
self.bus.post(GooglePubsubMessageEvent(topic=topic, msg=data))
@ -64,20 +67,23 @@ class GooglePubsubBackend(Backend):
def run(self):
# noinspection PyPackageRequirements
from google.cloud import pubsub_v1
# noinspection PyPackageRequirements
from google.api_core.exceptions import AlreadyExists
super().run()
set_thread_name('GooglePubSub')
plugin = self._get_plugin()
project_id = plugin.get_project_id()
credentials = plugin.get_credentials(plugin.subscriber_audience)
subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
for topic in self.topics:
if not topic.startswith('projects/{}/topics/'.format(project_id)):
topic = 'projects/{}/topics/{}'.format(project_id, topic)
subscription_name = '/'.join([*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]])
prefix = f'projects/{project_id}/topics/'
if not topic.startswith(prefix):
topic = f'{prefix}{topic}'
subscription_name = '/'.join(
[*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
)
try:
subscriber.create_subscription(name=subscription_name, topic=topic)

View file

@ -1,28 +1,36 @@
import logging
import re
import requests
from threading import Thread
import time
import requests
from frozendict import frozendict
from threading import Thread
from platypush.message.event.http import HttpEvent
from platypush.utils import set_thread_name
class HttpRequest(object):
class HttpRequest:
"""
Backend used for polling HTTP resources.
"""
poll_seconds = 60
timeout = 5
class HttpRequestArguments(object):
def __init__(self, url, method='get', *args, **kwargs):
class HttpRequestArguments:
"""
Models the properties of an HTTP request.
"""
def __init__(self, url, *args, method='get', **kwargs):
self.method = method.lower()
self.url = url
self.args = args
self.kwargs = kwargs
def __init__(self, args, bus=None, poll_seconds=None, timeout=None,
skip_first_call=True, **kwargs):
def __init__(
self, args, bus=None, poll_seconds=None, timeout=None, skip_first_call=True, **_
):
super().__init__()
self.poll_seconds = poll_seconds or self.poll_seconds
@ -43,12 +51,13 @@ class HttpRequest(object):
self.args.kwargs['timeout'] = self.timeout
self.request_args = {
'method': self.args.method, 'url': self.args.url, **self.args.kwargs
'method': self.args.method,
'url': self.args.url,
**self.args.kwargs,
}
def execute(self):
def _thread_func():
set_thread_name('HttpPoll')
is_first_call = self.last_request_timestamp == 0
self.last_request_timestamp = time.time()
@ -63,30 +72,45 @@ class HttpRequest(object):
else:
event = HttpEvent(dict(self), new_items)
if new_items and self.bus:
if not self.skip_first_call or (
self.skip_first_call and not is_first_call):
if (
new_items
and self.bus
and (
not self.skip_first_call
or (self.skip_first_call and not is_first_call)
)
):
self.bus.post(event)
response.raise_for_status()
except Exception as e:
self.logger.exception(e)
self.logger.warning('Encountered an error while retrieving {}: {}'.
format(self.args.url, str(e)))
self.logger.warning(
'Encountered an error while retrieving %s: %s', self.args.url, e
)
Thread(target=_thread_func, name='HttpPoll').start()
def get_new_items(self, response):
"""Gets new items out of a response"""
raise NotImplementedError("get_new_items must be implemented in a derived class")
raise NotImplementedError(
"get_new_items must be implemented in a derived class"
)
def __iter__(self):
for (key, value) in self.request_args.items():
"""
:return: The ``request_args`` as key-value pairs.
"""
for key, value in self.request_args.items():
yield key, value
class JsonHttpRequest(HttpRequest):
def __init__(self, path=None, *args, **kwargs):
"""
Specialization of the HttpRequest class for JSON requests.
"""
def __init__(self, *args, path=None, **kwargs):
super().__init__(*args, **kwargs)
self.path = path
self.seen_entries = set()
@ -97,7 +121,8 @@ class JsonHttpRequest(HttpRequest):
if self.path:
m = re.match(r'\${\s*(.*)\s*}', self.path)
response = eval(m.group(1))
if m:
response = eval(m.group(1)) # pylint: disable=eval-used
for entry in response:
flattened_entry = deep_freeze(entry)
@ -109,6 +134,11 @@ class JsonHttpRequest(HttpRequest):
def deep_freeze(x):
"""
Deep freezes a Python object - works for strings, dictionaries, sets and
iterables.
"""
if isinstance(x, str) or not hasattr(x, "__len__"):
return x
if hasattr(x, "keys") and hasattr(x, "values"):
@ -118,4 +148,5 @@ def deep_freeze(x):
return frozenset(map(deep_freeze, x))
# vim:sw=4:ts=4:et:

View file

@ -5,11 +5,17 @@ import threading
import time
from platypush.backend import Backend
from platypush.utils import set_thread_name
from platypush.message.event.music.snapcast import ClientVolumeChangeEvent, \
GroupMuteChangeEvent, ClientConnectedEvent, ClientDisconnectedEvent, \
ClientLatencyChangeEvent, ClientNameChangeEvent, GroupStreamChangeEvent, \
StreamUpdateEvent, ServerUpdateEvent
from platypush.message.event.music.snapcast import (
ClientVolumeChangeEvent,
GroupMuteChangeEvent,
ClientConnectedEvent,
ClientDisconnectedEvent,
ClientLatencyChangeEvent,
ClientNameChangeEvent,
GroupStreamChangeEvent,
StreamUpdateEvent,
ServerUpdateEvent,
)
class MusicSnapcastBackend(Backend):
@ -34,8 +40,14 @@ class MusicSnapcastBackend(Backend):
_DEFAULT_POLL_SECONDS = 10 # Poll servers each 10 seconds
_SOCKET_EOL = '\r\n'.encode()
def __init__(self, hosts=None, ports=None,
poll_seconds=_DEFAULT_POLL_SECONDS, *args, **kwargs):
def __init__(
self,
hosts=None,
ports=None,
poll_seconds=_DEFAULT_POLL_SECONDS,
*args,
**kwargs,
):
"""
:param hosts: List of Snapcast server names or IPs to monitor (default:
`['localhost']`
@ -72,24 +84,25 @@ class MusicSnapcastBackend(Backend):
if self._socks.get(host):
return self._socks[host]
self.logger.debug('Connecting to {host}:{port}'.format(host=host, port=port))
self.logger.debug('Connecting to %s:%d', host, port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
self._socks[host] = sock
self.logger.info('Connected to {host}:{port}'.format(host=host, port=port))
self.logger.info('Connected to %s:%d', host, port)
return sock
def _disconnect(self, host, port):
sock = self._socks.get(host)
if not sock:
self.logger.debug('Not connected to {}:{}'.format(host, port))
self.logger.debug('Not connected to %s:%d', host, port)
return
try:
sock.close()
except Exception as e:
self.logger.warning(('Exception while disconnecting from {host}:{port}: {error}'.
format(host=host, port=port, error=str(e))))
self.logger.warning(
'Exception while disconnecting from %s:%d: %s', host, port, e
)
finally:
self._socks[host] = None
@ -103,7 +116,7 @@ class MusicSnapcastBackend(Backend):
if ready[0]:
buf += sock.recv(1)
else:
return
return None
return json.loads(buf.decode().strip())
@ -115,8 +128,9 @@ class MusicSnapcastBackend(Backend):
client_id = msg.get('params', {}).get('id')
volume = msg.get('params', {}).get('volume', {}).get('percent')
muted = msg.get('params', {}).get('volume', {}).get('muted')
evt = ClientVolumeChangeEvent(host=host, client=client_id,
volume=volume, muted=muted)
evt = ClientVolumeChangeEvent(
host=host, client=client_id, volume=volume, muted=muted
)
elif msg.get('method') == 'Group.OnMute':
group_id = msg.get('params', {}).get('id')
muted = msg.get('params', {}).get('mute')
@ -149,10 +163,8 @@ class MusicSnapcastBackend(Backend):
return evt
def _client(self, host, port, thread_name):
def _client(self, host, port):
def _thread():
set_thread_name(thread_name)
while not self.should_stop():
try:
sock = self._connect(host, port)
@ -164,16 +176,20 @@ class MusicSnapcastBackend(Backend):
msgs = [msgs]
for msg in msgs:
self.logger.debug('Received message on {host}:{port}: {msg}'.
format(host=host, port=port, msg=msg))
self.logger.debug(
'Received message on {host}:{port}: {msg}'.format(
host=host, port=port, msg=msg
)
)
# noinspection PyTypeChecker
evt = self._parse_msg(host=host, msg=msg)
if evt:
self.bus.post(evt)
except Exception as e:
self.logger.warning('Exception while getting the status ' + 'of the Snapcast server {}:{}: {}'.
format(host, port, str(e)))
self.logger.warning(
'Exception while getting the status '
+ 'of the Snapcast server {}:{}: {}'.format(host, port, str(e))
)
self._disconnect(host, port)
finally:
@ -184,17 +200,19 @@ class MusicSnapcastBackend(Backend):
def run(self):
super().run()
self.logger.info('Initialized Snapcast backend - hosts: {} ports: {}'.
format(self.hosts, self.ports))
self.logger.info(
'Initialized Snapcast backend - hosts: {} ports: {}'.format(
self.hosts, self.ports
)
)
while not self.should_stop():
for i, host in enumerate(self.hosts):
port = self.ports[i]
thread_name = 'Snapcast-{host}-{port}'.format(host=host, port=port)
thread_name = f'Snapcast-{host}-{port}'
self._threads[host] = threading.Thread(
target=self._client(host, port, thread_name),
name=thread_name
target=self._client(host, port), name=thread_name
)
self._threads[host].start()
@ -211,8 +229,11 @@ class MusicSnapcastBackend(Backend):
try:
sock.close()
except Exception as e:
self.logger.warning('Could not close Snapcast connection to {}: {}: {}'.format(
host, type(e), str(e)))
self.logger.warning(
'Could not close Snapcast connection to {}: {}: {}'.format(
host, type(e), str(e)
)
)
# vim:sw=4:ts=4:et:

View file

@ -6,7 +6,6 @@ from typing import Optional
from platypush.backend import Backend
from platypush.message import Message
from platypush.utils import set_thread_name
class TcpBackend(Backend):
@ -17,19 +16,20 @@ class TcpBackend(Backend):
# Maximum length of a request to be processed
_MAX_REQ_SIZE = 2048
def __init__(self, port, bind_address=None, listen_queue=5, *args, **kwargs):
def __init__(self, port, bind_address=None, listen_queue=5, **kwargs):
"""
:param port: TCP port number
:type port: int
:param bind_address: Specify a bind address if you want to hook the service to a specific interface (default: listen for any connections)
:param bind_address: Specify a bind address if you want to hook the
service to a specific interface (default: listen for any connections).
:type bind_address: str
:param listen_queue: Maximum number of queued connections (default: 5)
:type listen_queue: int
"""
super().__init__(*args, **kwargs)
super().__init__(name=self.__class__.__name__, **kwargs)
self.port = port
self.bind_address = bind_address or '0.0.0.0'
@ -46,8 +46,11 @@ class TcpBackend(Backend):
while not self.should_stop():
if processed_bytes > self._MAX_REQ_SIZE:
self.logger.warning('Ignoring message longer than {} bytes from {}'
.format(self._MAX_REQ_SIZE, address[0]))
self.logger.warning(
'Ignoring message longer than {} bytes from {}'.format(
self._MAX_REQ_SIZE, address[0]
)
)
return
ch = sock.recv(1)
@ -76,17 +79,16 @@ class TcpBackend(Backend):
return
msg = Message.build(msg)
self.logger.info('Received request from {}: {}'.format(msg, address[0]))
self.logger.info('Received request from %s: %s', msg, address[0])
self.on_message(msg)
response = self.get_message_response(msg)
self.logger.info('Processing response on the TCP backend: {}'.format(response))
self.logger.info('Processing response on the TCP backend: %s', response)
if response:
sock.send(str(response).encode())
def _f_wrapper():
set_thread_name('TCPListener')
try:
_f()
finally:
@ -111,11 +113,16 @@ class TcpBackend(Backend):
serv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serv_sock.settimeout(0.5)
self.logger.info('Initialized TCP backend on port {} with bind address {}'.
format(self.port, self.bind_address))
self.logger.info(
'Initialized TCP backend on port %s with bind address %s',
self.port,
self.bind_address,
)
serv_sock.listen(self.listen_queue)
self._srv = multiprocessing.Process(target=self._accept_process, args=(serv_sock,))
self._srv = multiprocessing.Process(
target=self._accept_process, args=(serv_sock,)
)
self._srv.start()
while not self.should_stop():
@ -124,7 +131,7 @@ class TcpBackend(Backend):
except (socket.timeout, queue.Empty):
continue
self.logger.info('Accepted connection from client {}'.format(address[0]))
self.logger.info('Accepted connection from client %s', address[0])
self._process_client(sock, address)
if self._srv:

View file

@ -8,7 +8,7 @@ import croniter
from dateutil.tz import gettz
from platypush.procedure import Procedure
from platypush.utils import is_functional_cron, set_thread_name
from platypush.utils import is_functional_cron
logger = logging.getLogger('platypush:cron')
@ -52,7 +52,7 @@ class Cronjob(threading.Thread):
"""
def __init__(self, name, cron_expression, actions):
super().__init__()
super().__init__(name=f'cron:{name}')
self.cron_expression = cron_expression
self.name = name
self.state = CronjobState.IDLE
@ -79,7 +79,6 @@ class Cronjob(threading.Thread):
"""
Inner logic of the cronjob thread.
"""
set_thread_name(f'cron:{self.name}')
# Wait until an event is received or the next execution slot is reached
self.wait()
@ -203,7 +202,7 @@ class CronScheduler(threading.Thread):
logger.info('Running cron scheduler')
while not self.should_stop():
for (job_name, job_config) in self.jobs_config.items():
for job_name, job_config in self.jobs_config.items():
job = self._get_job(name=job_name, config=job_config)
if job.state == CronjobState.IDLE:
try:

View file

@ -5,7 +5,6 @@ from typing import Dict, Optional
from platypush.context import get_bus
from platypush.entities import Entity
from platypush.message.event.entities import EntityUpdateEvent
from platypush.utils import set_thread_name
from platypush.entities._base import EntityKey, EntitySavedCallback
from platypush.entities._engine.queue import EntitiesQueue
@ -99,7 +98,6 @@ class EntitiesEngine(Thread):
def run(self):
super().run()
set_thread_name('entities')
self.logger.info('Started entities engine')
self._running.set()

View file

@ -3,20 +3,23 @@ import json
import logging
import threading
from functools import wraps
from typing import Optional, Type
from platypush.common import exec_wrapper
from platypush.config import Config
from platypush.message.event import Event
from platypush.message.request import Request
from platypush.procedure import Procedure
from platypush.utils import get_event_class_by_type, set_thread_name, is_functional_hook
from platypush.utils import get_event_class_by_type, is_functional_hook
logger = logging.getLogger('platypush')
def parse(msg):
"""Builds a dict given another dictionary or
a JSON UTF-8 encoded string/bytearray"""
"""
Builds a dict given another dictionary or a JSON UTF-8 encoded
string/bytearray.
"""
if isinstance(msg, (bytes, bytearray)):
msg = msg.decode('utf-8')
@ -24,16 +27,18 @@ def parse(msg):
try:
msg = json.loads(msg.strip())
except json.JSONDecodeError:
logger.warning('Invalid JSON message: {}'.format(msg))
logger.warning('Invalid JSON message: %s', msg)
return None
return msg
# pylint: disable=too-few-public-methods
class EventCondition:
"""Event hook condition class"""
def __init__(self, type=Event.__class__, priority=None, **kwargs):
# pylint: disable=redefined-builtin
def __init__(self, type: Optional[Type[Event]] = None, priority=None, **kwargs):
"""
Rule constructor.
Params:
@ -42,40 +47,40 @@ class EventCondition:
or recognized_phrase='Your phrase')
"""
self.type = type
self.type = type or Event.__class__ # type: ignore
self.args = {}
self.parsed_args = {}
self.priority = priority
for (key, value) in kwargs.items():
# TODO So far we only allow simple value match. If value is a dict
# instead, we should allow more a sophisticated attribute matching,
# e.g. or conditions, in, and other operators.
for key, value in kwargs.items():
self.args[key] = value
@classmethod
def build(cls, rule):
"""Builds a rule given either another EventRule, a dictionary or
a JSON UTF-8 encoded string/bytearray"""
"""
Builds a rule given either another EventRule, a dictionary or a JSON
UTF-8 encoded string/bytearray.
"""
if isinstance(rule, cls):
return rule
else:
rule = parse(rule)
rule = parse(rule)
assert isinstance(rule, dict), f'Not a valid rule: {rule}'
type = get_event_class_by_type(rule.pop('type') if 'type' in rule else 'Event')
args = {}
for (key, value) in rule.items():
for key, value in rule.items():
args[key] = value
return cls(type=type, **args)
class EventAction(Request):
"""Event hook action class. It is a special type of runnable request
whose fields can be configured later depending on the event context"""
"""
Event hook action class. It is a special type of runnable request whose
fields can be configured later depending on the event context.
"""
def __init__(self, target=None, action=None, **args):
if target is None:
@ -84,30 +89,34 @@ class EventAction(Request):
super().__init__(target=target, action=action, **args_copy)
@classmethod
def build(cls, action):
action = super().parse(action)
action['origin'] = Config.get('device_id')
def build(cls, msg):
msg = super().parse(msg)
msg['origin'] = Config.get('device_id')
if 'target' not in action:
action['target'] = action['origin']
if 'target' not in msg:
msg['target'] = msg['origin']
token = Config.get('token')
if token:
action['token'] = token
msg['token'] = token
return super().build(action)
return super().build(msg)
class EventHook:
"""Event hook class. It consists of one conditions and
one or multiple actions to be executed"""
"""
Event hook class. It consists of one conditions and one or multiple actions
to be executed.
"""
def __init__(self, name, priority=None, condition=None, actions=None):
"""Constructor. Takes a name, a EventCondition object and an event action
procedure as input. It may also have a priority attached
as a positive number. If multiple hooks match against an event,
only the ones that have either the maximum match score or the
maximum pre-configured priority will be run."""
"""
Takes a name, a EventCondition object and an event action procedure as
input. It may also have a priority attached as a positive number. If
multiple hooks match against an event, only the ones that have either
the maximum match score or the maximum pre-configured priority will be
run.
"""
self.name = name
self.condition = EventCondition.build(condition or {})
@ -116,23 +125,27 @@ class EventHook:
self.condition.priority = self.priority
@classmethod
def build(cls, name, hook):
"""Builds a rule given either another EventRule, a dictionary or
a JSON UTF-8 encoded string/bytearray"""
def build(cls, name, hook): # pylint: disable=redefined-outer-name
"""
Builds a rule given either another EventRule, a dictionary or a JSON
UTF-8 encoded string/bytearray.
"""
if isinstance(hook, cls):
return hook
else:
hook = parse(hook)
hook = parse(hook)
if is_functional_hook(hook):
actions = Procedure(name=name, requests=[hook], _async=False)
return cls(name=name, condition=hook.condition, actions=actions)
return cls(
name=name, condition=getattr(hook, 'condition', None), actions=actions
)
assert isinstance(hook, dict)
condition = EventCondition.build(hook['if']) if 'if' in hook else None
actions = []
priority = hook['priority'] if 'priority' in hook else None
if condition:
condition.priority = priority
if 'then' in hook:
@ -145,29 +158,38 @@ class EventHook:
return cls(name=name, condition=condition, actions=actions, priority=priority)
def matches_event(self, event):
"""Returns an EventMatchResult object containing the information
about the match between the event and this hook"""
"""
Returns an EventMatchResult object containing the information about the
match between the event and this hook.
"""
return event.matches_condition(self.condition)
def run(self, event):
"""Checks the condition of the hook against a particular event and
runs the hook actions if the condition is met"""
"""
Checks the condition of the hook against a particular event and runs
the hook actions if the condition is met.
"""
def _thread_func(result):
set_thread_name('Event-' + self.name)
self.actions.execute(event=event, **result.parsed_args)
executor = getattr(self.actions, 'execute', None)
if executor and callable(executor):
executor(event=event, **result.parsed_args)
result = self.matches_event(event)
if result.is_match:
logger.info('Running hook {} triggered by an event'.format(self.name))
logger.info('Running hook %s triggered by an event', self.name)
threading.Thread(
target=_thread_func, name='Event-' + self.name, args=(result,)
).start()
def hook(event_type=Event, **condition):
"""
Decorator used for event hook functions.
"""
def wrapper(f):
f.hook = True
f.condition = EventCondition(type=event_type, **condition)

View file

@ -30,7 +30,7 @@ class Request(Message):
target,
action,
origin=None,
id=None,
id=None, # pylint: disable=redefined-builtin
backend=None,
args=None,
token=None,
@ -109,8 +109,6 @@ class Request(Message):
return proc.execute(*args, **kwargs)
def _expand_context(self, event_args=None, **context):
from platypush.config import Config
if event_args is None:
event_args = copy.deepcopy(self.args)
@ -138,16 +136,19 @@ class Request(Message):
return event_args
@classmethod
# pylint: disable=too-many-branches
def expand_value_from_context(cls, _value, **context):
for k, v in context.items():
if isinstance(v, Message):
v = json.loads(str(v))
try:
exec('{}={}'.format(k, v))
exec('{}={}'.format(k, v)) # pylint: disable=exec-used
except Exception:
if isinstance(v, str):
try:
exec('{}="{}"'.format(k, re.sub(r'(^|[^\\])"', '\1\\"', v)))
exec( # pylint: disable=exec-used
'{}="{}"'.format(k, re.sub(r'(^|[^\\])"', '\1\\"', v))
)
except Exception as e2:
logger.debug(
'Could not set context variable %s=%s: %s', k, v, e2
@ -167,7 +168,7 @@ class Request(Message):
_value = m.group(4)
try:
context_value = eval(inner_expr)
context_value = eval(inner_expr) # pylint: disable=eval-used
if callable(context_value):
context_value = context_value()
@ -209,6 +210,7 @@ class Request(Message):
redis.send_message(queue_name, response)
redis.expire(queue_name, 60)
# pylint: disable=too-many-statements
def execute(self, n_tries=1, _async=True, **context):
"""
Execute this request and returns a Response object
@ -224,6 +226,7 @@ class Request(Message):
- group: ${group_name} # will be expanded as "Kitchen lights")
"""
# pylint: disable=too-many-branches
def _thread_func(_n_tries, errors=None):
from platypush.context import get_bus
from platypush.plugins import RunnablePlugin

View file

@ -24,7 +24,6 @@ from platypush.message.event.light import (
LightStatusChangeEvent,
)
from platypush.plugins import RunnablePlugin, action
from platypush.utils import set_thread_name
class LightHuePlugin(RunnablePlugin, LightEntityManager):
@ -1054,7 +1053,6 @@ class LightHuePlugin(RunnablePlugin, LightEntityManager):
return self._animation_stop.is_set()
def _animate_thread(lights):
set_thread_name('HueAnimate')
get_bus().post(
LightAnimationStartedEvent(
lights=lights,
@ -1209,7 +1207,7 @@ class LightHuePlugin(RunnablePlugin, LightEntityManager):
def status(self, *_, **__) -> Iterable[LightEntity]:
lights = self.transform_entities(self._get_lights(publish_entities=True))
for light in lights:
light.id = light.external_id
light.id = light.external_id # type: ignore
for attr, value in (light.data or {}).items():
setattr(light, attr, value)

View file

@ -89,7 +89,7 @@ def get_plugin_class_by_name(plugin_name):
module = get_plugin_module_by_name(plugin_name)
if not module:
return
return None
class_name = getattr(
module, ''.join([_.capitalize() for _ in plugin_name.split('.')]) + 'Plugin'
@ -126,7 +126,7 @@ def get_backend_class_by_name(backend_name: str):
module = get_backend_module_by_name(backend_name)
if not module:
return
return None
class_name = getattr(
module,
@ -149,7 +149,7 @@ def get_backend_class_by_name(backend_name: str):
return None
def get_backend_name_by_class(backend) -> Optional[str]:
def get_backend_name_by_class(backend) -> str:
"""Gets the common name of a backend (e.g. "http" or "mqtt") given its class."""
from platypush.backend import Backend
@ -206,12 +206,14 @@ def get_decorators(cls, climb_class_hierarchy=False):
decorators = {}
# noinspection PyPep8Naming
def visit_FunctionDef(node):
for n in node.decorator_list:
if isinstance(n, ast.Call):
# noinspection PyUnresolvedReferences
name = n.func.attr if isinstance(n.func, ast.Attribute) else n.func.id
name = (
n.func.attr
if isinstance(n.func, ast.Attribute)
else n.func.id # type: ignore
)
else:
name = n.attr if isinstance(n, ast.Attribute) else n.id
@ -257,6 +259,7 @@ def _get_ssl_context(
else:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
assert ssl_cert, 'No certificate specified'
if ssl_cafile or ssl_capath:
ssl_context.load_verify_locations(cafile=ssl_cafile, capath=ssl_capath)
@ -311,18 +314,6 @@ def get_ssl_client_context(
)
def set_thread_name(name: str):
"""
Set the name of the current thread.
"""
try:
import prctl
prctl.set_name(name) # pylint: disable=no-member
except ImportError:
logger.debug('Unable to set thread name: prctl module is missing')
def find_bins_in_path(bin_name):
"""
Search for a binary in the PATH variable.
@ -399,7 +390,6 @@ def get_mime_type(resource: str) -> Optional[str]:
offset = len('file://')
resource = resource[offset:]
# noinspection HttpUrlsUsage
if resource.startswith('http://') or resource.startswith('https://'):
with urllib.request.urlopen(resource) as response:
return response.info().get_content_type()
@ -442,6 +432,8 @@ def grouper(n, iterable, fillvalue=None):
for chunk in zip_longest(*args):
yield filter(None, chunk)
return
def is_functional_procedure(obj) -> bool:
"""
@ -529,7 +521,7 @@ def get_or_generate_jwt_rsa_key_pair():
"""
from platypush.config import Config
key_dir = os.path.join(Config.get('workdir'), 'jwt')
key_dir = os.path.join(Config.workdir, 'jwt')
priv_key_file = os.path.join(key_dir, 'id_rsa')
pub_key_file = priv_key_file + '.pub'

View file

@ -13,7 +13,6 @@ marshmallow_dataclass
paho-mqtt
python-dateutil
python-magic
python-prctl
pyyaml
redis
requests

View file

@ -83,8 +83,6 @@ setup(
'zeroconf>=0.27.0',
],
extras_require={
# Support for thread custom name
'threadname': ['python-prctl'],
# Support for Kafka backend and plugin
'kafka': ['kafka-python'],
# Support for Pushbullet backend and plugin