diff --git a/platypush/__init__.py b/platypush/__init__.py index 089c2950..c8731375 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -23,7 +23,7 @@ from .message.event import Event from .message.event.application import ApplicationStartedEvent from .message.request import Request from .message.response import Response -from .utils import set_thread_name, get_enabled_plugins +from .utils import get_enabled_plugins __author__ = 'Fabio Manganiello ' __version__ = '0.50.3' @@ -252,7 +252,6 @@ class Daemon: if not self.no_capture_stderr: sys.stderr = Logger(log.warning) - set_thread_name('platypush') log.info('---- Starting platypush v.%s', __version__) # Initialize the backends and link them to the bus diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 98163394..54aeadea 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -18,7 +18,6 @@ from platypush.utils import ( set_timeout, clear_timeout, get_redis_queue_name_by_message, - set_thread_name, get_backend_name_by_class, ) @@ -81,7 +80,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): self._request_context = kwargs['_req_ctx'] if '_req_ctx' in kwargs else None if 'logging' in kwargs: - self.logger.setLevel(getattr(logging, kwargs.get('logging').upper())) + self.logger.setLevel(getattr(logging, kwargs['logging'].upper())) def on_message(self, msg): """ @@ -95,21 +94,23 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): """ msg = Message.build(msg) + if msg is None: + return if not getattr(msg, 'target', None) or msg.target != self.device_id: return # Not for me self.logger.debug( - 'Message received on the {} backend: {}'.format( - self.__class__.__name__, msg - ) + 'Message received on the %s backend: %s', self.__class__.__name__, msg ) if self._is_expected_response(msg): # Expected response, trigger the response handler clear_timeout() - # pylint: disable=unsubscriptable-object - self._request_context['on_response'](msg) + if self._request_context: + # pylint: disable=unsubscriptable-object + self._request_context['on_response'](msg) + self.stop() return @@ -117,8 +118,10 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): self.bus.post(msg) def _is_expected_response(self, msg): - """Internal only - returns true if we are expecting for a response - and msg is that response""" + """ + Internal only - returns true if we are expecting for a response and msg + is that response. + """ # pylint: disable=unsubscriptable-object return ( @@ -131,12 +134,12 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): config_name = ( 'backend.' + self.__class__.__name__.split('Backend', maxsplit=1)[0].lower() ) - return Config.get(config_name) + return Config.get(config_name) or {} def _setup_response_handler(self, request, on_response, response_timeout): def _timeout_hndl(): raise RuntimeError( - 'Timed out while waiting for a response from {}'.format(request.target) + f'Timed out while waiting for a response from {request.target}' ) req_ctx = { @@ -177,7 +180,7 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): request, on_response=None, response_timeout=_default_response_timeout, - **kwargs + **kwargs, ): """ Send a request message on the backend. @@ -237,9 +240,10 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): except KeyError: self.logger.warning( ( - "Backend {} does not implement send_message " + "Backend %s does not implement send_message " "and the fallback Redis backend isn't configured" - ).format(self.__class__.__name__) + ), + self.__class__.__name__, ) return @@ -248,7 +252,6 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): def run(self): """Starts the backend thread. To be implemented in the derived classes if the loop method isn't defined.""" self.thread_id = get_ident() - set_thread_name(self._thread_name) if not callable(self.loop): return @@ -272,21 +275,19 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): time.sleep(5) except Exception as e: self.logger.error( - '{} initialization error: {}'.format( - self.__class__.__name__, str(e) - ) + '%s initialization error: %s', self.__class__.__name__, e ) self.logger.exception(e) time.sleep(self.poll_seconds or 5) def __enter__(self): """Invoked when the backend is initialized, if the main logic is within a ``loop()`` function""" - self.logger.info('Initialized backend {}'.format(self.__class__.__name__)) + self.logger.info('Initialized backend %s', self.__class__.__name__) - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, *_, **__): """Invoked when the backend is terminated, if the main logic is within a ``loop()`` function""" self.on_stop() - self.logger.info('Terminated backend {}'.format(self.__class__.__name__)) + self.logger.info('Terminated backend %s', self.__class__.__name__) def on_stop(self): """Callback invoked when the process stops""" @@ -324,9 +325,14 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): return redis def get_message_response(self, msg): + queue = get_redis_queue_name_by_message(msg) + if not queue: + self.logger.warning('No response queue configured for the message') + return None + try: redis = self._get_redis() - response = redis.blpop(get_redis_queue_name_by_message(msg), timeout=60) + response = redis.blpop(queue, timeout=60) if response and len(response) > 1: response = Message.build(response[1]) else: @@ -334,9 +340,9 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): return response except Exception as e: - self.logger.error( - 'Error while processing response to {}: {}'.format(msg, str(e)) - ) + self.logger.error('Error while processing response to %s: %s', msg, e) + + return None @staticmethod def _get_ip() -> str: @@ -395,18 +401,9 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): } name = name or re.sub(r'Backend$', '', self.__class__.__name__).lower() - srv_type = srv_type or '_platypush-{name}._{proto}.local.'.format( - name=name, proto='udp' if udp else 'tcp' - ) - srv_name = srv_name or '{host}.{type}'.format( - host=self.device_id, type=srv_type - ) - - if port: - srv_port = port - else: - srv_port = self.port if hasattr(self, 'port') else None - + srv_type = srv_type or f'_platypush-{name}._{"udp" if udp else "tcp"}.local.' + srv_name = srv_name or f'{self.device_id}.{srv_type}' + srv_port = port if port else getattr(self, 'port', None) self.zeroconf_info = ServiceInfo( srv_type, srv_name, @@ -439,9 +436,10 @@ class Backend(Thread, EventGenerator, ExtensionWithManifest): self.zeroconf.unregister_service(self.zeroconf_info) except Exception as e: self.logger.warning( - 'Could not register Zeroconf service {}: {}: {}'.format( - self.zeroconf_info.name, type(e).__name__, str(e) - ) + 'Could not register Zeroconf service %s: %s: %s', + self.zeroconf_info.name, + type(e).__name__, + e, ) if self.zeroconf: diff --git a/platypush/backend/google/pubsub/__init__.py b/platypush/backend/google/pubsub/__init__.py index 524c20c3..7d1aa984 100644 --- a/platypush/backend/google/pubsub/__init__.py +++ b/platypush/backend/google/pubsub/__init__.py @@ -5,7 +5,6 @@ from typing import Optional, List from platypush.backend import Backend from platypush.context import get_plugin from platypush.message.event.google.pubsub import GooglePubsubMessageEvent -from platypush.utils import set_thread_name class GooglePubsubBackend(Backend): @@ -25,7 +24,9 @@ class GooglePubsubBackend(Backend): """ - def __init__(self, topics: List[str], credentials_file: Optional[str] = None, *args, **kwargs): + def __init__( + self, topics: List[str], credentials_file: Optional[str] = None, *args, **kwargs + ): """ :param topics: List of topics to subscribe. You can either specify the full topic name in the format ``projects//topics/``, where ```` must be the ID of your @@ -35,7 +36,7 @@ class GooglePubsubBackend(Backend): ``google.pubsub`` plugin or ``~/.credentials/platypush/google/pubsub.json``). """ - super().__init__(*args, **kwargs) + super().__init__(*args, name='GooglePubSub', **kwargs) self.topics = topics if credentials_file: @@ -46,7 +47,9 @@ class GooglePubsubBackend(Backend): @staticmethod def _get_plugin(): - return get_plugin('google.pubsub') + plugin = get_plugin('google.pubsub') + assert plugin, 'google.pubsub plugin not enabled' + return plugin def _message_callback(self, topic): def callback(msg): @@ -54,7 +57,7 @@ class GooglePubsubBackend(Backend): try: data = json.loads(data) except Exception as e: - self.logger.debug('Not a valid JSON: {}: {}'.format(data, str(e))) + self.logger.debug('Not a valid JSON: %s: %s', data, e) msg.ack() self.bus.post(GooglePubsubMessageEvent(topic=topic, msg=data)) @@ -64,20 +67,23 @@ class GooglePubsubBackend(Backend): def run(self): # noinspection PyPackageRequirements from google.cloud import pubsub_v1 + # noinspection PyPackageRequirements from google.api_core.exceptions import AlreadyExists super().run() - set_thread_name('GooglePubSub') plugin = self._get_plugin() project_id = plugin.get_project_id() credentials = plugin.get_credentials(plugin.subscriber_audience) subscriber = pubsub_v1.SubscriberClient(credentials=credentials) for topic in self.topics: - if not topic.startswith('projects/{}/topics/'.format(project_id)): - topic = 'projects/{}/topics/{}'.format(project_id, topic) - subscription_name = '/'.join([*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]) + prefix = f'projects/{project_id}/topics/' + if not topic.startswith(prefix): + topic = f'{prefix}{topic}' + subscription_name = '/'.join( + [*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]] + ) try: subscriber.create_subscription(name=subscription_name, topic=topic) diff --git a/platypush/backend/http/request/__init__.py b/platypush/backend/http/request/__init__.py index d6980471..bfce6523 100644 --- a/platypush/backend/http/request/__init__.py +++ b/platypush/backend/http/request/__init__.py @@ -1,28 +1,36 @@ import logging import re -import requests +from threading import Thread import time +import requests from frozendict import frozendict -from threading import Thread from platypush.message.event.http import HttpEvent -from platypush.utils import set_thread_name -class HttpRequest(object): +class HttpRequest: + """ + Backend used for polling HTTP resources. + """ + poll_seconds = 60 timeout = 5 - class HttpRequestArguments(object): - def __init__(self, url, method='get', *args, **kwargs): + class HttpRequestArguments: + """ + Models the properties of an HTTP request. + """ + + def __init__(self, url, *args, method='get', **kwargs): self.method = method.lower() self.url = url self.args = args self.kwargs = kwargs - def __init__(self, args, bus=None, poll_seconds=None, timeout=None, - skip_first_call=True, **kwargs): + def __init__( + self, args, bus=None, poll_seconds=None, timeout=None, skip_first_call=True, **_ + ): super().__init__() self.poll_seconds = poll_seconds or self.poll_seconds @@ -43,12 +51,13 @@ class HttpRequest(object): self.args.kwargs['timeout'] = self.timeout self.request_args = { - 'method': self.args.method, 'url': self.args.url, **self.args.kwargs + 'method': self.args.method, + 'url': self.args.url, + **self.args.kwargs, } def execute(self): def _thread_func(): - set_thread_name('HttpPoll') is_first_call = self.last_request_timestamp == 0 self.last_request_timestamp = time.time() @@ -63,30 +72,45 @@ class HttpRequest(object): else: event = HttpEvent(dict(self), new_items) - if new_items and self.bus: - if not self.skip_first_call or ( - self.skip_first_call and not is_first_call): - self.bus.post(event) + if ( + new_items + and self.bus + and ( + not self.skip_first_call + or (self.skip_first_call and not is_first_call) + ) + ): + self.bus.post(event) response.raise_for_status() except Exception as e: self.logger.exception(e) - self.logger.warning('Encountered an error while retrieving {}: {}'. - format(self.args.url, str(e))) + self.logger.warning( + 'Encountered an error while retrieving %s: %s', self.args.url, e + ) Thread(target=_thread_func, name='HttpPoll').start() def get_new_items(self, response): - """ Gets new items out of a response """ - raise NotImplementedError("get_new_items must be implemented in a derived class") + """Gets new items out of a response""" + raise NotImplementedError( + "get_new_items must be implemented in a derived class" + ) def __iter__(self): - for (key, value) in self.request_args.items(): + """ + :return: The ``request_args`` as key-value pairs. + """ + for key, value in self.request_args.items(): yield key, value class JsonHttpRequest(HttpRequest): - def __init__(self, path=None, *args, **kwargs): + """ + Specialization of the HttpRequest class for JSON requests. + """ + + def __init__(self, *args, path=None, **kwargs): super().__init__(*args, **kwargs) self.path = path self.seen_entries = set() @@ -97,7 +121,8 @@ class JsonHttpRequest(HttpRequest): if self.path: m = re.match(r'\${\s*(.*)\s*}', self.path) - response = eval(m.group(1)) + if m: + response = eval(m.group(1)) # pylint: disable=eval-used for entry in response: flattened_entry = deep_freeze(entry) @@ -109,6 +134,11 @@ class JsonHttpRequest(HttpRequest): def deep_freeze(x): + """ + Deep freezes a Python object - works for strings, dictionaries, sets and + iterables. + """ + if isinstance(x, str) or not hasattr(x, "__len__"): return x if hasattr(x, "keys") and hasattr(x, "values"): @@ -118,4 +148,5 @@ def deep_freeze(x): return frozenset(map(deep_freeze, x)) + # vim:sw=4:ts=4:et: diff --git a/platypush/backend/music/snapcast/__init__.py b/platypush/backend/music/snapcast/__init__.py index 53ea555c..89d6303f 100644 --- a/platypush/backend/music/snapcast/__init__.py +++ b/platypush/backend/music/snapcast/__init__.py @@ -5,11 +5,17 @@ import threading import time from platypush.backend import Backend -from platypush.utils import set_thread_name -from platypush.message.event.music.snapcast import ClientVolumeChangeEvent, \ - GroupMuteChangeEvent, ClientConnectedEvent, ClientDisconnectedEvent, \ - ClientLatencyChangeEvent, ClientNameChangeEvent, GroupStreamChangeEvent, \ - StreamUpdateEvent, ServerUpdateEvent +from platypush.message.event.music.snapcast import ( + ClientVolumeChangeEvent, + GroupMuteChangeEvent, + ClientConnectedEvent, + ClientDisconnectedEvent, + ClientLatencyChangeEvent, + ClientNameChangeEvent, + GroupStreamChangeEvent, + StreamUpdateEvent, + ServerUpdateEvent, +) class MusicSnapcastBackend(Backend): @@ -31,11 +37,17 @@ class MusicSnapcastBackend(Backend): """ _DEFAULT_SNAPCAST_PORT = 1705 - _DEFAULT_POLL_SECONDS = 10 # Poll servers each 10 seconds + _DEFAULT_POLL_SECONDS = 10 # Poll servers each 10 seconds _SOCKET_EOL = '\r\n'.encode() - def __init__(self, hosts=None, ports=None, - poll_seconds=_DEFAULT_POLL_SECONDS, *args, **kwargs): + def __init__( + self, + hosts=None, + ports=None, + poll_seconds=_DEFAULT_POLL_SECONDS, + *args, + **kwargs, + ): """ :param hosts: List of Snapcast server names or IPs to monitor (default: `['localhost']` @@ -72,24 +84,25 @@ class MusicSnapcastBackend(Backend): if self._socks.get(host): return self._socks[host] - self.logger.debug('Connecting to {host}:{port}'.format(host=host, port=port)) + self.logger.debug('Connecting to %s:%d', host, port) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((host, port)) self._socks[host] = sock - self.logger.info('Connected to {host}:{port}'.format(host=host, port=port)) + self.logger.info('Connected to %s:%d', host, port) return sock def _disconnect(self, host, port): sock = self._socks.get(host) if not sock: - self.logger.debug('Not connected to {}:{}'.format(host, port)) + self.logger.debug('Not connected to %s:%d', host, port) return try: sock.close() except Exception as e: - self.logger.warning(('Exception while disconnecting from {host}:{port}: {error}'. - format(host=host, port=port, error=str(e)))) + self.logger.warning( + 'Exception while disconnecting from %s:%d: %s', host, port, e + ) finally: self._socks[host] = None @@ -103,7 +116,7 @@ class MusicSnapcastBackend(Backend): if ready[0]: buf += sock.recv(1) else: - return + return None return json.loads(buf.decode().strip()) @@ -115,8 +128,9 @@ class MusicSnapcastBackend(Backend): client_id = msg.get('params', {}).get('id') volume = msg.get('params', {}).get('volume', {}).get('percent') muted = msg.get('params', {}).get('volume', {}).get('muted') - evt = ClientVolumeChangeEvent(host=host, client=client_id, - volume=volume, muted=muted) + evt = ClientVolumeChangeEvent( + host=host, client=client_id, volume=volume, muted=muted + ) elif msg.get('method') == 'Group.OnMute': group_id = msg.get('params', {}).get('id') muted = msg.get('params', {}).get('mute') @@ -149,10 +163,8 @@ class MusicSnapcastBackend(Backend): return evt - def _client(self, host, port, thread_name): + def _client(self, host, port): def _thread(): - set_thread_name(thread_name) - while not self.should_stop(): try: sock = self._connect(host, port) @@ -164,16 +176,20 @@ class MusicSnapcastBackend(Backend): msgs = [msgs] for msg in msgs: - self.logger.debug('Received message on {host}:{port}: {msg}'. - format(host=host, port=port, msg=msg)) + self.logger.debug( + 'Received message on {host}:{port}: {msg}'.format( + host=host, port=port, msg=msg + ) + ) - # noinspection PyTypeChecker evt = self._parse_msg(host=host, msg=msg) if evt: self.bus.post(evt) except Exception as e: - self.logger.warning('Exception while getting the status ' + 'of the Snapcast server {}:{}: {}'. - format(host, port, str(e))) + self.logger.warning( + 'Exception while getting the status ' + + 'of the Snapcast server {}:{}: {}'.format(host, port, str(e)) + ) self._disconnect(host, port) finally: @@ -184,17 +200,19 @@ class MusicSnapcastBackend(Backend): def run(self): super().run() - self.logger.info('Initialized Snapcast backend - hosts: {} ports: {}'. - format(self.hosts, self.ports)) + self.logger.info( + 'Initialized Snapcast backend - hosts: {} ports: {}'.format( + self.hosts, self.ports + ) + ) while not self.should_stop(): for i, host in enumerate(self.hosts): port = self.ports[i] - thread_name = 'Snapcast-{host}-{port}'.format(host=host, port=port) + thread_name = f'Snapcast-{host}-{port}' self._threads[host] = threading.Thread( - target=self._client(host, port, thread_name), - name=thread_name + target=self._client(host, port), name=thread_name ) self._threads[host].start() @@ -211,8 +229,11 @@ class MusicSnapcastBackend(Backend): try: sock.close() except Exception as e: - self.logger.warning('Could not close Snapcast connection to {}: {}: {}'.format( - host, type(e), str(e))) + self.logger.warning( + 'Could not close Snapcast connection to {}: {}: {}'.format( + host, type(e), str(e) + ) + ) # vim:sw=4:ts=4:et: diff --git a/platypush/backend/tcp/__init__.py b/platypush/backend/tcp/__init__.py index 0224c6a3..2d83fedd 100644 --- a/platypush/backend/tcp/__init__.py +++ b/platypush/backend/tcp/__init__.py @@ -6,7 +6,6 @@ from typing import Optional from platypush.backend import Backend from platypush.message import Message -from platypush.utils import set_thread_name class TcpBackend(Backend): @@ -17,19 +16,20 @@ class TcpBackend(Backend): # Maximum length of a request to be processed _MAX_REQ_SIZE = 2048 - def __init__(self, port, bind_address=None, listen_queue=5, *args, **kwargs): + def __init__(self, port, bind_address=None, listen_queue=5, **kwargs): """ :param port: TCP port number :type port: int - :param bind_address: Specify a bind address if you want to hook the service to a specific interface (default: listen for any connections) + :param bind_address: Specify a bind address if you want to hook the + service to a specific interface (default: listen for any connections). :type bind_address: str :param listen_queue: Maximum number of queued connections (default: 5) :type listen_queue: int """ - super().__init__(*args, **kwargs) + super().__init__(name=self.__class__.__name__, **kwargs) self.port = port self.bind_address = bind_address or '0.0.0.0' @@ -46,8 +46,11 @@ class TcpBackend(Backend): while not self.should_stop(): if processed_bytes > self._MAX_REQ_SIZE: - self.logger.warning('Ignoring message longer than {} bytes from {}' - .format(self._MAX_REQ_SIZE, address[0])) + self.logger.warning( + 'Ignoring message longer than {} bytes from {}'.format( + self._MAX_REQ_SIZE, address[0] + ) + ) return ch = sock.recv(1) @@ -76,17 +79,16 @@ class TcpBackend(Backend): return msg = Message.build(msg) - self.logger.info('Received request from {}: {}'.format(msg, address[0])) + self.logger.info('Received request from %s: %s', msg, address[0]) self.on_message(msg) response = self.get_message_response(msg) - self.logger.info('Processing response on the TCP backend: {}'.format(response)) + self.logger.info('Processing response on the TCP backend: %s', response) if response: sock.send(str(response).encode()) def _f_wrapper(): - set_thread_name('TCPListener') try: _f() finally: @@ -111,11 +113,16 @@ class TcpBackend(Backend): serv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) serv_sock.settimeout(0.5) - self.logger.info('Initialized TCP backend on port {} with bind address {}'. - format(self.port, self.bind_address)) + self.logger.info( + 'Initialized TCP backend on port %s with bind address %s', + self.port, + self.bind_address, + ) serv_sock.listen(self.listen_queue) - self._srv = multiprocessing.Process(target=self._accept_process, args=(serv_sock,)) + self._srv = multiprocessing.Process( + target=self._accept_process, args=(serv_sock,) + ) self._srv.start() while not self.should_stop(): @@ -124,7 +131,7 @@ class TcpBackend(Backend): except (socket.timeout, queue.Empty): continue - self.logger.info('Accepted connection from client {}'.format(address[0])) + self.logger.info('Accepted connection from client %s', address[0]) self._process_client(sock, address) if self._srv: diff --git a/platypush/cron/scheduler.py b/platypush/cron/scheduler.py index 445874aa..55b40a98 100644 --- a/platypush/cron/scheduler.py +++ b/platypush/cron/scheduler.py @@ -8,7 +8,7 @@ import croniter from dateutil.tz import gettz from platypush.procedure import Procedure -from platypush.utils import is_functional_cron, set_thread_name +from platypush.utils import is_functional_cron logger = logging.getLogger('platypush:cron') @@ -52,7 +52,7 @@ class Cronjob(threading.Thread): """ def __init__(self, name, cron_expression, actions): - super().__init__() + super().__init__(name=f'cron:{name}') self.cron_expression = cron_expression self.name = name self.state = CronjobState.IDLE @@ -79,7 +79,6 @@ class Cronjob(threading.Thread): """ Inner logic of the cronjob thread. """ - set_thread_name(f'cron:{self.name}') # Wait until an event is received or the next execution slot is reached self.wait() @@ -203,7 +202,7 @@ class CronScheduler(threading.Thread): logger.info('Running cron scheduler') while not self.should_stop(): - for (job_name, job_config) in self.jobs_config.items(): + for job_name, job_config in self.jobs_config.items(): job = self._get_job(name=job_name, config=job_config) if job.state == CronjobState.IDLE: try: diff --git a/platypush/entities/_engine/__init__.py b/platypush/entities/_engine/__init__.py index c23c4b34..cdd90874 100644 --- a/platypush/entities/_engine/__init__.py +++ b/platypush/entities/_engine/__init__.py @@ -5,7 +5,6 @@ from typing import Dict, Optional from platypush.context import get_bus from platypush.entities import Entity from platypush.message.event.entities import EntityUpdateEvent -from platypush.utils import set_thread_name from platypush.entities._base import EntityKey, EntitySavedCallback from platypush.entities._engine.queue import EntitiesQueue @@ -99,7 +98,6 @@ class EntitiesEngine(Thread): def run(self): super().run() - set_thread_name('entities') self.logger.info('Started entities engine') self._running.set() diff --git a/platypush/event/hook.py b/platypush/event/hook.py index 35cb052d..8eda7dd5 100644 --- a/platypush/event/hook.py +++ b/platypush/event/hook.py @@ -3,20 +3,23 @@ import json import logging import threading from functools import wraps +from typing import Optional, Type from platypush.common import exec_wrapper from platypush.config import Config from platypush.message.event import Event from platypush.message.request import Request from platypush.procedure import Procedure -from platypush.utils import get_event_class_by_type, set_thread_name, is_functional_hook +from platypush.utils import get_event_class_by_type, is_functional_hook logger = logging.getLogger('platypush') def parse(msg): - """Builds a dict given another dictionary or - a JSON UTF-8 encoded string/bytearray""" + """ + Builds a dict given another dictionary or a JSON UTF-8 encoded + string/bytearray. + """ if isinstance(msg, (bytes, bytearray)): msg = msg.decode('utf-8') @@ -24,16 +27,18 @@ def parse(msg): try: msg = json.loads(msg.strip()) except json.JSONDecodeError: - logger.warning('Invalid JSON message: {}'.format(msg)) + logger.warning('Invalid JSON message: %s', msg) return None return msg +# pylint: disable=too-few-public-methods class EventCondition: """Event hook condition class""" - def __init__(self, type=Event.__class__, priority=None, **kwargs): + # pylint: disable=redefined-builtin + def __init__(self, type: Optional[Type[Event]] = None, priority=None, **kwargs): """ Rule constructor. Params: @@ -42,40 +47,40 @@ class EventCondition: or recognized_phrase='Your phrase') """ - self.type = type + self.type = type or Event.__class__ # type: ignore self.args = {} self.parsed_args = {} self.priority = priority - for (key, value) in kwargs.items(): - # TODO So far we only allow simple value match. If value is a dict - # instead, we should allow more a sophisticated attribute matching, - # e.g. or conditions, in, and other operators. + for key, value in kwargs.items(): self.args[key] = value @classmethod def build(cls, rule): - """Builds a rule given either another EventRule, a dictionary or - a JSON UTF-8 encoded string/bytearray""" + """ + Builds a rule given either another EventRule, a dictionary or a JSON + UTF-8 encoded string/bytearray. + """ if isinstance(rule, cls): return rule - else: - rule = parse(rule) + rule = parse(rule) assert isinstance(rule, dict), f'Not a valid rule: {rule}' type = get_event_class_by_type(rule.pop('type') if 'type' in rule else 'Event') args = {} - for (key, value) in rule.items(): + for key, value in rule.items(): args[key] = value return cls(type=type, **args) class EventAction(Request): - """Event hook action class. It is a special type of runnable request - whose fields can be configured later depending on the event context""" + """ + Event hook action class. It is a special type of runnable request whose + fields can be configured later depending on the event context. + """ def __init__(self, target=None, action=None, **args): if target is None: @@ -84,30 +89,34 @@ class EventAction(Request): super().__init__(target=target, action=action, **args_copy) @classmethod - def build(cls, action): - action = super().parse(action) - action['origin'] = Config.get('device_id') + def build(cls, msg): + msg = super().parse(msg) + msg['origin'] = Config.get('device_id') - if 'target' not in action: - action['target'] = action['origin'] + if 'target' not in msg: + msg['target'] = msg['origin'] token = Config.get('token') if token: - action['token'] = token + msg['token'] = token - return super().build(action) + return super().build(msg) class EventHook: - """Event hook class. It consists of one conditions and - one or multiple actions to be executed""" + """ + Event hook class. It consists of one conditions and one or multiple actions + to be executed. + """ def __init__(self, name, priority=None, condition=None, actions=None): - """Constructor. Takes a name, a EventCondition object and an event action - procedure as input. It may also have a priority attached - as a positive number. If multiple hooks match against an event, - only the ones that have either the maximum match score or the - maximum pre-configured priority will be run.""" + """ + Takes a name, a EventCondition object and an event action procedure as + input. It may also have a priority attached as a positive number. If + multiple hooks match against an event, only the ones that have either + the maximum match score or the maximum pre-configured priority will be + run. + """ self.name = name self.condition = EventCondition.build(condition or {}) @@ -116,24 +125,28 @@ class EventHook: self.condition.priority = self.priority @classmethod - def build(cls, name, hook): - """Builds a rule given either another EventRule, a dictionary or - a JSON UTF-8 encoded string/bytearray""" + def build(cls, name, hook): # pylint: disable=redefined-outer-name + """ + Builds a rule given either another EventRule, a dictionary or a JSON + UTF-8 encoded string/bytearray. + """ if isinstance(hook, cls): return hook - else: - hook = parse(hook) + hook = parse(hook) if is_functional_hook(hook): actions = Procedure(name=name, requests=[hook], _async=False) - return cls(name=name, condition=hook.condition, actions=actions) + return cls( + name=name, condition=getattr(hook, 'condition', None), actions=actions + ) assert isinstance(hook, dict) condition = EventCondition.build(hook['if']) if 'if' in hook else None actions = [] priority = hook['priority'] if 'priority' in hook else None - condition.priority = priority + if condition: + condition.priority = priority if 'then' in hook: if isinstance(hook['then'], list): @@ -145,29 +158,38 @@ class EventHook: return cls(name=name, condition=condition, actions=actions, priority=priority) def matches_event(self, event): - """Returns an EventMatchResult object containing the information - about the match between the event and this hook""" + """ + Returns an EventMatchResult object containing the information about the + match between the event and this hook. + """ return event.matches_condition(self.condition) def run(self, event): - """Checks the condition of the hook against a particular event and - runs the hook actions if the condition is met""" + """ + Checks the condition of the hook against a particular event and runs + the hook actions if the condition is met. + """ def _thread_func(result): - set_thread_name('Event-' + self.name) - self.actions.execute(event=event, **result.parsed_args) + executor = getattr(self.actions, 'execute', None) + if executor and callable(executor): + executor(event=event, **result.parsed_args) result = self.matches_event(event) if result.is_match: - logger.info('Running hook {} triggered by an event'.format(self.name)) + logger.info('Running hook %s triggered by an event', self.name) threading.Thread( target=_thread_func, name='Event-' + self.name, args=(result,) ).start() def hook(event_type=Event, **condition): + """ + Decorator used for event hook functions. + """ + def wrapper(f): f.hook = True f.condition = EventCondition(type=event_type, **condition) diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 7ada15ea..57608a28 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -30,7 +30,7 @@ class Request(Message): target, action, origin=None, - id=None, + id=None, # pylint: disable=redefined-builtin backend=None, args=None, token=None, @@ -109,8 +109,6 @@ class Request(Message): return proc.execute(*args, **kwargs) def _expand_context(self, event_args=None, **context): - from platypush.config import Config - if event_args is None: event_args = copy.deepcopy(self.args) @@ -138,16 +136,19 @@ class Request(Message): return event_args @classmethod + # pylint: disable=too-many-branches def expand_value_from_context(cls, _value, **context): for k, v in context.items(): if isinstance(v, Message): v = json.loads(str(v)) try: - exec('{}={}'.format(k, v)) + exec('{}={}'.format(k, v)) # pylint: disable=exec-used except Exception: if isinstance(v, str): try: - exec('{}="{}"'.format(k, re.sub(r'(^|[^\\])"', '\1\\"', v))) + exec( # pylint: disable=exec-used + '{}="{}"'.format(k, re.sub(r'(^|[^\\])"', '\1\\"', v)) + ) except Exception as e2: logger.debug( 'Could not set context variable %s=%s: %s', k, v, e2 @@ -167,7 +168,7 @@ class Request(Message): _value = m.group(4) try: - context_value = eval(inner_expr) + context_value = eval(inner_expr) # pylint: disable=eval-used if callable(context_value): context_value = context_value() @@ -209,6 +210,7 @@ class Request(Message): redis.send_message(queue_name, response) redis.expire(queue_name, 60) + # pylint: disable=too-many-statements def execute(self, n_tries=1, _async=True, **context): """ Execute this request and returns a Response object @@ -224,6 +226,7 @@ class Request(Message): - group: ${group_name} # will be expanded as "Kitchen lights") """ + # pylint: disable=too-many-branches def _thread_func(_n_tries, errors=None): from platypush.context import get_bus from platypush.plugins import RunnablePlugin diff --git a/platypush/plugins/light/hue/__init__.py b/platypush/plugins/light/hue/__init__.py index deef5466..3b0b8c9c 100644 --- a/platypush/plugins/light/hue/__init__.py +++ b/platypush/plugins/light/hue/__init__.py @@ -24,7 +24,6 @@ from platypush.message.event.light import ( LightStatusChangeEvent, ) from platypush.plugins import RunnablePlugin, action -from platypush.utils import set_thread_name class LightHuePlugin(RunnablePlugin, LightEntityManager): @@ -1054,7 +1053,6 @@ class LightHuePlugin(RunnablePlugin, LightEntityManager): return self._animation_stop.is_set() def _animate_thread(lights): - set_thread_name('HueAnimate') get_bus().post( LightAnimationStartedEvent( lights=lights, @@ -1209,7 +1207,7 @@ class LightHuePlugin(RunnablePlugin, LightEntityManager): def status(self, *_, **__) -> Iterable[LightEntity]: lights = self.transform_entities(self._get_lights(publish_entities=True)) for light in lights: - light.id = light.external_id + light.id = light.external_id # type: ignore for attr, value in (light.data or {}).items(): setattr(light, attr, value) diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index 56dc6a50..12547b98 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -89,7 +89,7 @@ def get_plugin_class_by_name(plugin_name): module = get_plugin_module_by_name(plugin_name) if not module: - return + return None class_name = getattr( module, ''.join([_.capitalize() for _ in plugin_name.split('.')]) + 'Plugin' @@ -126,7 +126,7 @@ def get_backend_class_by_name(backend_name: str): module = get_backend_module_by_name(backend_name) if not module: - return + return None class_name = getattr( module, @@ -149,7 +149,7 @@ def get_backend_class_by_name(backend_name: str): return None -def get_backend_name_by_class(backend) -> Optional[str]: +def get_backend_name_by_class(backend) -> str: """Gets the common name of a backend (e.g. "http" or "mqtt") given its class.""" from platypush.backend import Backend @@ -206,12 +206,14 @@ def get_decorators(cls, climb_class_hierarchy=False): decorators = {} - # noinspection PyPep8Naming def visit_FunctionDef(node): for n in node.decorator_list: if isinstance(n, ast.Call): - # noinspection PyUnresolvedReferences - name = n.func.attr if isinstance(n.func, ast.Attribute) else n.func.id + name = ( + n.func.attr + if isinstance(n.func, ast.Attribute) + else n.func.id # type: ignore + ) else: name = n.attr if isinstance(n, ast.Attribute) else n.id @@ -257,6 +259,7 @@ def _get_ssl_context( else: ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + assert ssl_cert, 'No certificate specified' if ssl_cafile or ssl_capath: ssl_context.load_verify_locations(cafile=ssl_cafile, capath=ssl_capath) @@ -311,18 +314,6 @@ def get_ssl_client_context( ) -def set_thread_name(name: str): - """ - Set the name of the current thread. - """ - try: - import prctl - - prctl.set_name(name) # pylint: disable=no-member - except ImportError: - logger.debug('Unable to set thread name: prctl module is missing') - - def find_bins_in_path(bin_name): """ Search for a binary in the PATH variable. @@ -399,7 +390,6 @@ def get_mime_type(resource: str) -> Optional[str]: offset = len('file://') resource = resource[offset:] - # noinspection HttpUrlsUsage if resource.startswith('http://') or resource.startswith('https://'): with urllib.request.urlopen(resource) as response: return response.info().get_content_type() @@ -442,6 +432,8 @@ def grouper(n, iterable, fillvalue=None): for chunk in zip_longest(*args): yield filter(None, chunk) + return + def is_functional_procedure(obj) -> bool: """ @@ -529,7 +521,7 @@ def get_or_generate_jwt_rsa_key_pair(): """ from platypush.config import Config - key_dir = os.path.join(Config.get('workdir'), 'jwt') + key_dir = os.path.join(Config.workdir, 'jwt') priv_key_file = os.path.join(key_dir, 'id_rsa') pub_key_file = priv_key_file + '.pub' diff --git a/requirements.txt b/requirements.txt index b0677cd2..d61383ca 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,7 +13,6 @@ marshmallow_dataclass paho-mqtt python-dateutil python-magic -python-prctl pyyaml redis requests diff --git a/setup.py b/setup.py index bff5aa15..f71f1262 100755 --- a/setup.py +++ b/setup.py @@ -83,8 +83,6 @@ setup( 'zeroconf>=0.27.0', ], extras_require={ - # Support for thread custom name - 'threadname': ['python-prctl'], # Support for Kafka backend and plugin 'kafka': ['kafka-python'], # Support for Pushbullet backend and plugin