From e49a0aec4dc5370a34e043931b59e0dcb215e80c Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 8 Feb 2023 00:46:50 +0100 Subject: [PATCH] Various improvements. - Better synchronization logic on stop for `AsyncRunnablePlugin`. - Fixed several thread names by dropping `prctl.set_name` in favour of specifying the name directly on thread creation. - Several LINT fixes. --- platypush/backend/http/__init__.py | 48 ++++++++++---------- platypush/backend/mqtt/__init__.py | 7 +-- platypush/plugins/__init__.py | 59 ++++++++++++------------- platypush/plugins/ntfy/__init__.py | 70 +++++++++++++++--------------- 4 files changed, 91 insertions(+), 93 deletions(-) diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index 966ba615..c97e3b1d 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -6,14 +6,14 @@ from multiprocessing import Process try: from websockets.exceptions import ConnectionClosed - from websockets import serve as websocket_serve + from websockets import serve as websocket_serve # type: ignore except ImportError: - from websockets import ConnectionClosed, serve as websocket_serve + from websockets import ConnectionClosed, serve as websocket_serve # type: ignore from platypush.backend import Backend from platypush.backend.http.app import application from platypush.context import get_or_create_event_loop -from platypush.utils import get_ssl_server_context, set_thread_name +from platypush.utils import get_ssl_server_context class HttpBackend(Backend): @@ -186,7 +186,7 @@ class HttpBackend(Backend): maps=None, run_externally=False, uwsgi_args=None, - **kwargs + **kwargs, ): """ :param port: Listen port for the web server (default: 8008) @@ -283,15 +283,13 @@ class HttpBackend(Backend): '--enable-threads', ] - self.local_base_url = '{proto}://localhost:{port}'.format( - proto=('https' if ssl_cert else 'http'), port=self.port - ) - + protocol = 'https' if ssl_cert else 'http' + self.local_base_url = f'{protocol}://localhost:{self.port}' self._websocket_lock_timeout = 10 self._websocket_lock = threading.RLock() self._websocket_locks = {} - def send_message(self, msg, **kwargs): + def send_message(self, msg, *_, **__): self.logger.warning('Use cURL or any HTTP client to query the HTTP backend') def on_stop(self): @@ -351,9 +349,7 @@ class HttpBackend(Backend): timeout=self._websocket_lock_timeout ) if not acquire_ok: - raise TimeoutError( - 'Websocket on address {} not ready to receive data'.format(addr) - ) + raise TimeoutError(f'Websocket on address {addr} not ready to receive data') def _release_websocket_lock(self, ws): try: @@ -368,7 +364,7 @@ class HttpBackend(Backend): self._websocket_locks[addr].release() except Exception as e: self.logger.warning( - 'Unhandled exception while releasing websocket lock: {}'.format(str(e)) + 'Unhandled exception while releasing websocket lock: %s', e ) finally: self._websocket_lock.release() @@ -381,7 +377,7 @@ class HttpBackend(Backend): self._acquire_websocket_lock(ws) await ws.send(str(event)) except Exception as e: - self.logger.warning('Error on websocket send_event: {}'.format(e)) + self.logger.warning('Error on websocket send_event: %s', e) finally: self._release_websocket_lock(ws) @@ -393,7 +389,7 @@ class HttpBackend(Backend): loop.run_until_complete(send_event(_ws)) except ConnectionClosed: self.logger.warning( - 'Websocket client {} connection lost'.format(_ws.remote_address) + 'Websocket client %s connection lost', _ws.remote_address ) self.active_websockets.remove(_ws) if _ws.remote_address in self._websocket_locks: @@ -401,7 +397,6 @@ class HttpBackend(Backend): def websocket(self): """Websocket main server""" - set_thread_name('WebsocketServer') async def register_websocket(websocket, path): address = ( @@ -411,16 +406,14 @@ class HttpBackend(Backend): ) self.logger.info( - 'New websocket connection from {} on path {}'.format(address, path) + 'New websocket connection from %s on path %s', address, path ) self.active_websockets.add(websocket) try: await websocket.recv() except ConnectionClosed: - self.logger.info( - 'Websocket client {} closed connection'.format(address) - ) + self.logger.info('Websocket client %s closed connection', address) self.active_websockets.remove(websocket) if address in self._websocket_locks: del self._websocket_locks[address] @@ -435,14 +428,14 @@ class HttpBackend(Backend): register_websocket, self.bind_address, self.websocket_port, - **websocket_args + **websocket_args, ) ) self._websocket_loop.run_forever() def _start_web_server(self): def proc(): - self.logger.info('Starting local web server on port {}'.format(self.port)) + self.logger.info('Starting local web server on port %s', self.port) kwargs = { 'host': self.bind_address, 'port': self.port, @@ -470,7 +463,10 @@ class HttpBackend(Backend): if not self.disable_websocket: self.logger.info('Initializing websocket interface') - self.websocket_thread = threading.Thread(target=self.websocket) + self.websocket_thread = threading.Thread( + target=self.websocket, + name='WebsocketServer', + ) self.websocket_thread.start() if not self.run_externally: @@ -481,13 +477,13 @@ class HttpBackend(Backend): self.server_proc.join() elif self.uwsgi_args: uwsgi_cmd = ['uwsgi'] + self.uwsgi_args - self.logger.info('Starting uWSGI with arguments {}'.format(uwsgi_cmd)) + self.logger.info('Starting uWSGI with arguments %s', uwsgi_cmd) self.server_proc = subprocess.Popen(uwsgi_cmd) else: self.logger.info( 'The web server is configured to be launched externally but ' - + 'no uwsgi_args were provided. Make sure that you run another external service' - + 'for the webserver (e.g. nginx)' + 'no uwsgi_args were provided. Make sure that you run another external service' + 'for the web server (e.g. nginx)' ) self._service_registry_thread = threading.Thread(target=self._register_service) diff --git a/platypush/backend/mqtt/__init__.py b/platypush/backend/mqtt/__init__.py index 2dbc89b6..3ca89d32 100644 --- a/platypush/backend/mqtt/__init__.py +++ b/platypush/backend/mqtt/__init__.py @@ -13,7 +13,6 @@ from platypush.message import Message from platypush.message.event.mqtt import MQTTMessageEvent from platypush.message.request import Request from platypush.plugins.mqtt import MqttPlugin as MQTTPlugin -from platypush.utils import set_thread_name class MqttClient(mqtt.Client, threading.Thread): @@ -39,6 +38,7 @@ class MqttClient(mqtt.Client, threading.Thread): mqtt.Client.__init__(self, *args, client_id=client_id, **kwargs) threading.Thread.__init__(self) + self.name = f'MQTTClient:{client_id}' self.host = host self.port = port self.topics = set(topics or []) @@ -393,7 +393,6 @@ class MqttBackend(Backend): def handler(_, __, msg): # noinspection PyShadowingNames def response_thread(msg): - set_thread_name('MQTTProcessor') response = self.get_message_response(msg) if not response: return @@ -428,7 +427,9 @@ class MqttBackend(Backend): if isinstance(msg, Request): threading.Thread( - target=response_thread, name='MQTTProcessor', args=(msg,) + target=response_thread, + name='MQTTProcessorResponseThread', + args=(msg,), ).start() return handler diff --git a/platypush/plugins/__init__.py b/platypush/plugins/__init__.py index bdd88475..a3fb325c 100644 --- a/platypush/plugins/__init__.py +++ b/platypush/plugins/__init__.py @@ -1,7 +1,6 @@ import asyncio import logging import threading -import time from abc import ABC, abstractmethod from functools import wraps @@ -11,7 +10,7 @@ from platypush.bus import Bus from platypush.common import ExtensionWithManifest from platypush.event import EventGenerator from platypush.message.response import Response -from platypush.utils import get_decorators, get_plugin_name_by_class, set_thread_name +from platypush.utils import get_decorators, get_plugin_name_by_class PLUGIN_STOP_TIMEOUT = 5 # Plugin stop timeout in seconds @@ -107,25 +106,22 @@ class RunnablePlugin(Plugin): return self._should_stop.wait(timeout=timeout) def start(self): - set_thread_name(self.__class__.__name__) - self._thread = threading.Thread(target=self._runner) + self._thread = threading.Thread( + target=self._runner, name=self.__class__.__name__ + ) self._thread.start() def stop(self): self._should_stop.set() if self._thread and self._thread.is_alive(): - self.logger.info('Waiting for %s to stop', self.__class__.__name__) + self.logger.info('Waiting for the plugin to stop') try: if self._thread: self._thread.join(timeout=self._stop_timeout) if self._thread and self._thread.is_alive(): self.logger.warning( - 'Timeout (seconds={%s}) on exit for the plugin %s', + 'Timeout (seconds=%s) on exit for the plugin', self._stop_timeout, - ( - get_plugin_name_by_class(self.__class__) - or self.__class__.__name__ - ), ) except Exception as e: self.logger.warning('Could not join thread on stop: %s', e) @@ -142,7 +138,7 @@ class RunnablePlugin(Plugin): self.logger.exception(e) if self.poll_interval: - time.sleep(self.poll_interval) + self.wait_stop(self.poll_interval) self._thread = None @@ -156,18 +152,27 @@ class AsyncRunnablePlugin(RunnablePlugin, ABC): super().__init__(*args, **kwargs) self._loop: Optional[asyncio.AbstractEventLoop] = None - self._loop_runner: Optional[threading.Thread] = None self._task: Optional[asyncio.Task] = None @property def _should_start_runner(self): + """ + This property is used to determine if the runner and the event loop + should be started for this plugin. + """ return True @abstractmethod async def listen(self): - pass + """ + Main body of the async plugin. When it's called, the event loop should + already be running and available over `self._loop`. + """ async def _listen(self): + """ + Wrapper for :meth:`.listen` that catches any exceptions and logs them. + """ try: await self.listen() except KeyboardInterrupt: @@ -179,41 +184,37 @@ class AsyncRunnablePlugin(RunnablePlugin, ABC): ): raise e - def _start_listener(self): - set_thread_name(self.__class__.__name__ + ':listener') + def _run_listener(self): self._loop = asyncio.new_event_loop() asyncio.set_event_loop(self._loop) self._task = self._loop.create_task(self._listen()) if hasattr(self._task, 'set_name'): self._task.set_name(self.__class__.__name__ + '.listen') - self._loop.run_forever() + try: + self._loop.run_until_complete(self._task) + except Exception as e: + self.logger.info('The loop has terminated with an error: %s', e) + + self._task.cancel() def main(self): - if self.should_stop() or (self._loop_runner and self._loop_runner.is_alive()): - self.logger.info('The main loop is already being run/stopped') + if self.should_stop(): + self.logger.info('The plugin is already scheduled to stop') return + self._loop = asyncio.new_event_loop() + if self._should_start_runner: - self._loop_runner = threading.Thread(target=self._start_listener) - self._loop_runner.start() + self._run_listener() self.wait_stop() def stop(self): - if self._task and self._loop and not self._task.done(): - self._loop.call_soon_threadsafe(self._task.cancel) - if self._loop and self._loop.is_running(): self._loop.call_soon_threadsafe(self._loop.stop) self._loop = None - if self._loop_runner and self._loop_runner.is_alive(): - try: - self._loop_runner.join(timeout=self._stop_timeout) - finally: - self._loop_runner = None - super().stop() diff --git a/platypush/plugins/ntfy/__init__.py b/platypush/plugins/ntfy/__init__.py index a24b46a5..9f621d23 100644 --- a/platypush/plugins/ntfy/__init__.py +++ b/platypush/plugins/ntfy/__init__.py @@ -1,7 +1,7 @@ import asyncio import json import os -from typing import Optional, Collection, Mapping +from typing import Any, Callable, Dict, Optional, Collection, Mapping import requests import websockets @@ -53,12 +53,12 @@ class NtfyPlugin(AsyncRunnablePlugin): reconnect_wait_secs_max = 60 while not self.should_stop(): - self.logger.debug(f'Connecting to {url}') + self.logger.debug('Connecting to %s', url) try: - async with websockets.connect(url) as ws: # type: ignore + async with websockets.connect(url) as ws: # pylint: disable=no-member reconnect_wait_secs = 1 - self.logger.info(f'Connected to {url}') + self.logger.info('Connected to %s', url) async for msg in ws: try: msg = json.loads(msg) @@ -122,7 +122,7 @@ class NtfyPlugin(AsyncRunnablePlugin): tags: Optional[Collection[str]] = None, schedule: Optional[str] = None, ): - """ + r""" Send a message/notification to a topic. :param topic: Topic where the message will be delivered. @@ -148,36 +148,36 @@ class NtfyPlugin(AsyncRunnablePlugin): - ``broadcast``: Send an `Android broadcast ` intent upon action selection (only available on Android). - Example: + Actions example: - .. code-block:: json + .. code-block:: json - [ - { - "action": "view", - "label": "Open portal", - "url": "https://home.nest.com/", - "clear": true + [ + { + "action": "view", + "label": "Open portal", + "url": "https://home.nest.com/", + "clear": true + }, + { + "action": "http", + "label": "Turn down", + "url": "https://api.nest.com/", + "method": "PUT", + "headers": { + "Authorization": "Bearer abcdef..." }, - { - "action": "http", - "label": "Turn down", - "url": "https://api.nest.com/", - "method": "PUT", - "headers": { - "Authorization": "Bearer abcdef..." - }, - "body": "{\\"temperature\\": 65}" - }, - { - "action": "broadcast", - "label": "Take picture", - "intent": "com.myapp.TAKE_PICTURE_INTENT", - "extras": { - "camera": "front" - } + "body": "{\\"temperature\\": 65}" + }, + { + "action": "broadcast", + "label": "Take picture", + "intent": "com.myapp.TAKE_PICTURE_INTENT", + "extras": { + "camera": "front" } - ] + } + ] :param email: Forward the notification as an email to the specified address. @@ -196,9 +196,9 @@ class NtfyPlugin(AsyncRunnablePlugin): ``tomorrow, 3pm``) """ - method = requests.post + method: Callable[..., requests.Response] = requests.post url = server_url or self._server_url - args = {} + args: Dict[str, Any] = {} if username and password: args['auth'] = (username, password) @@ -247,8 +247,8 @@ class NtfyPlugin(AsyncRunnablePlugin): } rs = method(url, **args) - assert rs.ok, 'Could not send message to {}: {}'.format( - topic, rs.json().get('error', f'HTTP error: {rs.status_code}') + assert rs.ok, f'Could not send message to {topic}: ' + rs.json().get( + 'error', f'HTTP error: {rs.status_code}' ) return rs.json()