From 2e7f3d88681ed2beb54bb500902a44ac459f5ed0 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 12 Aug 2022 15:22:04 +0200 Subject: [PATCH 1/5] Removed references to deprecated websockets attributes --- platypush/plugins/websocket/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/platypush/plugins/websocket/__init__.py b/platypush/plugins/websocket/__init__.py index 86716394a..8999b1247 100644 --- a/platypush/plugins/websocket/__init__.py +++ b/platypush/plugins/websocket/__init__.py @@ -135,9 +135,9 @@ class WebsocketPlugin(Plugin): time_start = time.time() time_end = time_start + timeout if timeout else 0 url = 'ws{secure}://{host}:{port}{path}'.format( - secure='s' if ws.secure else '', - host=ws.host, - port=ws.port, + secure='s' if ws._secure else '', + host=ws.remote_address[0], + port=ws.remote_address[1], path=ws.path, ) From 4043878afd0803f1553c21238f18baf62e9623ce Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 14 Aug 2022 00:45:29 +0200 Subject: [PATCH 2/5] Refactored concurrency model in ntfy plugin --- platypush/plugins/ntfy/__init__.py | 48 ++++++++++++++---------------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/platypush/plugins/ntfy/__init__.py b/platypush/plugins/ntfy/__init__.py index 064275723..8619df7a1 100644 --- a/platypush/plugins/ntfy/__init__.py +++ b/platypush/plugins/ntfy/__init__.py @@ -7,6 +7,7 @@ from typing import Optional, Collection, Mapping import requests import websockets +import websockets.exceptions from platypush.context import get_bus from platypush.message.event.ntfy import NotificationEvent @@ -48,23 +49,14 @@ class NtfyPlugin(RunnablePlugin): ] ) - self._event_loop: Optional[asyncio.AbstractEventLoop] = None self._subscriptions = subscriptions or [] self._ws_proc = None - def _connect(self): - if self.should_stop() or (self._ws_proc and self._ws_proc.is_alive()): - self.logger.debug('Already connected') - return - - self._ws_proc = multiprocessing.Process(target=self._ws_process) - self._ws_proc.start() - async def _get_ws_handler(self, url): reconnect_wait_secs = 1 reconnect_wait_secs_max = 60 - while True: + while not self.should_stop(): self.logger.debug(f'Connecting to {url}') try: @@ -104,32 +96,38 @@ class NtfyPlugin(RunnablePlugin): reconnect_wait_secs * 2, reconnect_wait_secs_max ) - async def _ws_processor(self, urls): - await asyncio.wait([self._get_ws_handler(url) for url in urls]) - def _ws_process(self): - self._event_loop = get_or_create_event_loop() + loop = get_or_create_event_loop() try: - self._event_loop.run_until_complete( - self._ws_processor( - {f'{self._ws_url}/{sub}/ws' for sub in self._subscriptions} + loop.run_until_complete( + asyncio.wait( + { + self._get_ws_handler(f'{self._ws_url}/{sub}/ws') + for sub in self._subscriptions + } ) ) except KeyboardInterrupt: pass def main(self): - if self._subscriptions: - self._connect() + if self.should_stop() or (self._ws_proc and self._ws_proc.is_alive()): + self.logger.debug('Already connected') + return - while not self._should_stop.is_set(): - self._should_stop.wait(timeout=1) + if self._subscriptions: + self._ws_proc = multiprocessing.Process(target=self._ws_process) + self._ws_proc.start() + + self.wait_stop() def stop(self): - if self._ws_proc: - self._ws_proc.kill() - self._ws_proc.join() - self._ws_proc = None + if self._ws_proc and self._ws_proc.is_alive(): + self._ws_proc.terminate() + try: + self._ws_proc.join(timeout=3) + except TimeoutError: + self._ws_proc.kill() super().stop() From 3b1ab78268a19b853f41c34a2b8580cee69bc157 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 14 Aug 2022 22:30:49 +0200 Subject: [PATCH 3/5] Added AsyncRunnablePlugin class. This class handles runnable plugins that have their own asyncio event loop, without the pain usually caused by the management of multiple threads + asyncio loops. --- platypush/plugins/__init__.py | 72 +++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/platypush/plugins/__init__.py b/platypush/plugins/__init__.py index 8338620ec..9e3edab85 100644 --- a/platypush/plugins/__init__.py +++ b/platypush/plugins/__init__.py @@ -1,7 +1,9 @@ +import asyncio import logging import threading import time +from abc import ABC, abstractmethod from functools import wraps from typing import Optional @@ -110,4 +112,74 @@ class RunnablePlugin(Plugin): self._thread = None +class AsyncRunnablePlugin(RunnablePlugin, ABC): + """ + Class for runnable plugins with an asynchronous event loop attached. + """ + + def __init__(self, *args, _stop_timeout: Optional[float] = 30.0, **kwargs): + super().__init__(*args, **kwargs) + + self._stop_timeout = _stop_timeout + self._loop: Optional[asyncio.AbstractEventLoop] = None + self._loop_runner: Optional[threading.Thread] = None + self._task: Optional[asyncio.Task] = None + + @property + def _should_start_runner(self): + return True + + @abstractmethod + async def listen(self): + pass + + async def _listen(self): + try: + await self.listen() + except KeyboardInterrupt: + pass + except RuntimeError as e: + if not ( + str(e).startswith('Event loop stopped before ') + or str(e).startswith('no running event loop') + ): + raise e + + def _start_listener(self): + set_thread_name(self.__class__.__name__ + ':listener') + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + self._task = self._loop.create_task(self._listen()) + self._task.set_name(self.__class__.__name__ + '.listen') + self._loop.run_forever() + + def main(self): + if self.should_stop() or (self._loop_runner and self._loop_runner.is_alive()): + self.logger.info('The main loop is already being run/stopped') + return + + if self._should_start_runner: + self._loop_runner = threading.Thread(target=self._start_listener) + self._loop_runner.start() + + self.wait_stop() + + def stop(self): + if self._task and self._loop and not self._task.done(): + self._loop.call_soon_threadsafe(self._task.cancel) + + if self._loop and self._loop.is_running(): + self._loop.call_soon_threadsafe(self._loop.stop) + self._loop = None + + if self._loop_runner and self._loop_runner.is_alive(): + try: + self._loop_runner.join(timeout=self._stop_timeout) + finally: + self._loop_runner = None + + super().stop() + + # vim:sw=4:ts=4:et: From e17e65a7039e81734f20f9f9cf20cda817a95821 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 14 Aug 2022 22:34:25 +0200 Subject: [PATCH 4/5] ntfy plugin migrated to AsyncRunnablePlugin. This commit removes a lot of the loop management boilerplate. --- platypush/plugins/ntfy/__init__.py | 55 ++++++++---------------------- 1 file changed, 14 insertions(+), 41 deletions(-) diff --git a/platypush/plugins/ntfy/__init__.py b/platypush/plugins/ntfy/__init__.py index 8619df7a1..1d3faffbc 100644 --- a/platypush/plugins/ntfy/__init__.py +++ b/platypush/plugins/ntfy/__init__.py @@ -1,8 +1,6 @@ import asyncio import json -import multiprocessing import os -import time from typing import Optional, Collection, Mapping import requests @@ -11,11 +9,10 @@ import websockets.exceptions from platypush.context import get_bus from platypush.message.event.ntfy import NotificationEvent -from platypush.plugins import RunnablePlugin, action -from platypush.context import get_or_create_event_loop +from platypush.plugins import AsyncRunnablePlugin, action -class NtfyPlugin(RunnablePlugin): +class NtfyPlugin(AsyncRunnablePlugin): """ Ntfy integration. @@ -50,7 +47,6 @@ class NtfyPlugin(RunnablePlugin): ) self._subscriptions = subscriptions or [] - self._ws_proc = None async def _get_ws_handler(self, url): reconnect_wait_secs = 1 @@ -60,7 +56,7 @@ class NtfyPlugin(RunnablePlugin): self.logger.debug(f'Connecting to {url}') try: - async with websockets.connect(url) as ws: + async with websockets.connect(url) as ws: # type: ignore reconnect_wait_secs = 1 self.logger.info(f'Connected to {url}') async for msg in ws: @@ -91,45 +87,22 @@ class NtfyPlugin(RunnablePlugin): ) except websockets.exceptions.WebSocketException as e: self.logger.error('Websocket error: %s', e) - time.sleep(reconnect_wait_secs) + await asyncio.sleep(reconnect_wait_secs) reconnect_wait_secs = min( reconnect_wait_secs * 2, reconnect_wait_secs_max ) - def _ws_process(self): - loop = get_or_create_event_loop() - try: - loop.run_until_complete( - asyncio.wait( - { - self._get_ws_handler(f'{self._ws_url}/{sub}/ws') - for sub in self._subscriptions - } - ) - ) - except KeyboardInterrupt: - pass + async def listen(self): + return await asyncio.wait( + [ + self._get_ws_handler(f'{self._ws_url}/{sub}/ws') + for sub in set(self._subscriptions) + ] + ) - def main(self): - if self.should_stop() or (self._ws_proc and self._ws_proc.is_alive()): - self.logger.debug('Already connected') - return - - if self._subscriptions: - self._ws_proc = multiprocessing.Process(target=self._ws_process) - self._ws_proc.start() - - self.wait_stop() - - def stop(self): - if self._ws_proc and self._ws_proc.is_alive(): - self._ws_proc.terminate() - try: - self._ws_proc.join(timeout=3) - except TimeoutError: - self._ws_proc.kill() - - super().stop() + @property + def _should_start_runner(self): + return bool(self._subscriptions) @action def send_message( From 4e3c6a5c16550ca19bf4449bd085b491832a6d76 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 15 Aug 2022 00:14:52 +0200 Subject: [PATCH 5/5] The websocket plugin now extends AsyncRunnablePlugin too --- platypush/plugins/websocket/__init__.py | 58 ++++++++++++++++++------- 1 file changed, 42 insertions(+), 16 deletions(-) diff --git a/platypush/plugins/websocket/__init__.py b/platypush/plugins/websocket/__init__.py index 8999b1247..5744c075d 100644 --- a/platypush/plugins/websocket/__init__.py +++ b/platypush/plugins/websocket/__init__.py @@ -2,18 +2,20 @@ import asyncio import json import time -from websockets import connect as websocket_connect +from typing import Optional, Collection + +from websockets import connect as websocket_connect # type: ignore from websockets.exceptions import ConnectionClosed -from platypush.context import get_or_create_event_loop, get_bus +from platypush.context import get_bus from platypush.message.event.websocket import WebsocketMessageEvent -from platypush.plugins import Plugin, action +from platypush.plugins import AsyncRunnablePlugin, action from platypush.utils import get_ssl_client_context -class WebsocketPlugin(Plugin): +class WebsocketPlugin(AsyncRunnablePlugin): """ - Plugin to send messages over a websocket connection. + Plugin to send and receive messages over websocket connections. Triggers: @@ -22,6 +24,22 @@ class WebsocketPlugin(Plugin): """ + def __init__(self, subscriptions: Optional[Collection[str]] = None, **kwargs): + """ + :param subscriptions: List of websocket URLs that should be subscribed + at startup, prefixed by ``ws://`` or ``wss://``. + """ + super().__init__(**kwargs) + self._subscriptions = subscriptions or [] + + @property + def loop(self): + if not self._loop: + self._loop = asyncio.new_event_loop() + asyncio.set_event_loop(self._loop) + + return self._loop + @action def send( self, @@ -52,6 +70,8 @@ class WebsocketPlugin(Plugin): otherwise nothing. """ + msg = self._parse_msg(msg) + async def send(): websocket_args = { 'ssl': self._get_ssl_context( @@ -70,13 +90,11 @@ class WebsocketPlugin(Plugin): self.logger.warning('Error on websocket %s: %s', url, err) if wait_response: - messages = await self._ws_recv(ws, num_messages=1) + messages = await self._recv(ws, num_messages=1) if messages: return self._parse_msg(messages[0]) - msg = self._parse_msg(msg) - loop = get_or_create_event_loop() - return loop.run_until_complete(send()) + return asyncio.run_coroutine_threadsafe(send(), self.loop).result() @action def recv( @@ -123,14 +141,11 @@ class WebsocketPlugin(Plugin): } async with websocket_connect(url, **websocket_args) as ws: - return await self._ws_recv( - ws, timeout=timeout, num_messages=num_messages - ) + return await self._recv(ws, timeout=timeout, num_messages=num_messages) - loop = get_or_create_event_loop() - return loop.run_until_complete(recv()) + return self.loop.call_soon_threadsafe(recv) - async def _ws_recv(self, ws, timeout=0, num_messages=0): + async def _recv(self, ws, timeout=0, num_messages=0): messages = [] time_start = time.time() time_end = time_start + timeout if timeout else 0 @@ -166,6 +181,10 @@ class WebsocketPlugin(Plugin): return messages + @property + def _should_start_runner(self): + return bool(self._subscriptions) + @staticmethod def _parse_msg(msg): try: @@ -175,11 +194,18 @@ class WebsocketPlugin(Plugin): return msg + async def listen(self): + async def _recv(url): + async with websocket_connect(url) as ws: + return await self._recv(ws) + + await asyncio.wait([_recv(url) for url in set(self._subscriptions)]) + @staticmethod def _get_ssl_context( url: str, ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None ): - if url.startswith('wss://'): + if url.startswith('wss://') or url.startswith('https://'): return get_ssl_client_context( ssl_cert=ssl_cert, ssl_key=ssl_key,