ntfy plugin migrated to AsyncRunnablePlugin.

This commit removes a lot of the loop management boilerplate.
This commit is contained in:
Fabio Manganiello 2022-08-14 22:34:25 +02:00
parent 3b1ab78268
commit e17e65a703
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774

View file

@ -1,8 +1,6 @@
import asyncio import asyncio
import json import json
import multiprocessing
import os import os
import time
from typing import Optional, Collection, Mapping from typing import Optional, Collection, Mapping
import requests import requests
@ -11,11 +9,10 @@ import websockets.exceptions
from platypush.context import get_bus from platypush.context import get_bus
from platypush.message.event.ntfy import NotificationEvent from platypush.message.event.ntfy import NotificationEvent
from platypush.plugins import RunnablePlugin, action from platypush.plugins import AsyncRunnablePlugin, action
from platypush.context import get_or_create_event_loop
class NtfyPlugin(RunnablePlugin): class NtfyPlugin(AsyncRunnablePlugin):
""" """
Ntfy integration. Ntfy integration.
@ -50,7 +47,6 @@ class NtfyPlugin(RunnablePlugin):
) )
self._subscriptions = subscriptions or [] self._subscriptions = subscriptions or []
self._ws_proc = None
async def _get_ws_handler(self, url): async def _get_ws_handler(self, url):
reconnect_wait_secs = 1 reconnect_wait_secs = 1
@ -60,7 +56,7 @@ class NtfyPlugin(RunnablePlugin):
self.logger.debug(f'Connecting to {url}') self.logger.debug(f'Connecting to {url}')
try: try:
async with websockets.connect(url) as ws: async with websockets.connect(url) as ws: # type: ignore
reconnect_wait_secs = 1 reconnect_wait_secs = 1
self.logger.info(f'Connected to {url}') self.logger.info(f'Connected to {url}')
async for msg in ws: async for msg in ws:
@ -91,45 +87,22 @@ class NtfyPlugin(RunnablePlugin):
) )
except websockets.exceptions.WebSocketException as e: except websockets.exceptions.WebSocketException as e:
self.logger.error('Websocket error: %s', e) self.logger.error('Websocket error: %s', e)
time.sleep(reconnect_wait_secs) await asyncio.sleep(reconnect_wait_secs)
reconnect_wait_secs = min( reconnect_wait_secs = min(
reconnect_wait_secs * 2, reconnect_wait_secs_max reconnect_wait_secs * 2, reconnect_wait_secs_max
) )
def _ws_process(self): async def listen(self):
loop = get_or_create_event_loop() return await asyncio.wait(
try: [
loop.run_until_complete( self._get_ws_handler(f'{self._ws_url}/{sub}/ws')
asyncio.wait( for sub in set(self._subscriptions)
{ ]
self._get_ws_handler(f'{self._ws_url}/{sub}/ws') )
for sub in self._subscriptions
}
)
)
except KeyboardInterrupt:
pass
def main(self): @property
if self.should_stop() or (self._ws_proc and self._ws_proc.is_alive()): def _should_start_runner(self):
self.logger.debug('Already connected') return bool(self._subscriptions)
return
if self._subscriptions:
self._ws_proc = multiprocessing.Process(target=self._ws_process)
self._ws_proc.start()
self.wait_stop()
def stop(self):
if self._ws_proc and self._ws_proc.is_alive():
self._ws_proc.terminate()
try:
self._ws_proc.join(timeout=3)
except TimeoutError:
self._ws_proc.kill()
super().stop()
@action @action
def send_message( def send_message(