From 770a14daae4e3e7f58c449a4d60162644e4345a0 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 14 Aug 2022 22:34:25 +0200 Subject: [PATCH] ntfy plugin migrated to AsyncRunnablePlugin. This commit removes a lot of the loop management boilerplate. --- platypush/plugins/ntfy/__init__.py | 55 ++++++++---------------------- 1 file changed, 14 insertions(+), 41 deletions(-) diff --git a/platypush/plugins/ntfy/__init__.py b/platypush/plugins/ntfy/__init__.py index 8619df7a1..1d3faffbc 100644 --- a/platypush/plugins/ntfy/__init__.py +++ b/platypush/plugins/ntfy/__init__.py @@ -1,8 +1,6 @@ import asyncio import json -import multiprocessing import os -import time from typing import Optional, Collection, Mapping import requests @@ -11,11 +9,10 @@ import websockets.exceptions from platypush.context import get_bus from platypush.message.event.ntfy import NotificationEvent -from platypush.plugins import RunnablePlugin, action -from platypush.context import get_or_create_event_loop +from platypush.plugins import AsyncRunnablePlugin, action -class NtfyPlugin(RunnablePlugin): +class NtfyPlugin(AsyncRunnablePlugin): """ Ntfy integration. @@ -50,7 +47,6 @@ class NtfyPlugin(RunnablePlugin): ) self._subscriptions = subscriptions or [] - self._ws_proc = None async def _get_ws_handler(self, url): reconnect_wait_secs = 1 @@ -60,7 +56,7 @@ class NtfyPlugin(RunnablePlugin): self.logger.debug(f'Connecting to {url}') try: - async with websockets.connect(url) as ws: + async with websockets.connect(url) as ws: # type: ignore reconnect_wait_secs = 1 self.logger.info(f'Connected to {url}') async for msg in ws: @@ -91,45 +87,22 @@ class NtfyPlugin(RunnablePlugin): ) except websockets.exceptions.WebSocketException as e: self.logger.error('Websocket error: %s', e) - time.sleep(reconnect_wait_secs) + await asyncio.sleep(reconnect_wait_secs) reconnect_wait_secs = min( reconnect_wait_secs * 2, reconnect_wait_secs_max ) - def _ws_process(self): - loop = get_or_create_event_loop() - try: - loop.run_until_complete( - asyncio.wait( - { - self._get_ws_handler(f'{self._ws_url}/{sub}/ws') - for sub in self._subscriptions - } - ) - ) - except KeyboardInterrupt: - pass + async def listen(self): + return await asyncio.wait( + [ + self._get_ws_handler(f'{self._ws_url}/{sub}/ws') + for sub in set(self._subscriptions) + ] + ) - def main(self): - if self.should_stop() or (self._ws_proc and self._ws_proc.is_alive()): - self.logger.debug('Already connected') - return - - if self._subscriptions: - self._ws_proc = multiprocessing.Process(target=self._ws_process) - self._ws_proc.start() - - self.wait_stop() - - def stop(self): - if self._ws_proc and self._ws_proc.is_alive(): - self._ws_proc.terminate() - try: - self._ws_proc.join(timeout=3) - except TimeoutError: - self._ws_proc.kill() - - super().stop() + @property + def _should_start_runner(self): + return bool(self._subscriptions) @action def send_message(