forked from platypush/platypush
ntfy plugin migrated to AsyncRunnablePlugin.
This commit removes a lot of the loop management boilerplate.
This commit is contained in:
parent
dba03d3e33
commit
770a14daae
1 changed files with 14 additions and 41 deletions
|
@ -1,8 +1,6 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import multiprocessing
|
|
||||||
import os
|
import os
|
||||||
import time
|
|
||||||
from typing import Optional, Collection, Mapping
|
from typing import Optional, Collection, Mapping
|
||||||
|
|
||||||
import requests
|
import requests
|
||||||
|
@ -11,11 +9,10 @@ import websockets.exceptions
|
||||||
|
|
||||||
from platypush.context import get_bus
|
from platypush.context import get_bus
|
||||||
from platypush.message.event.ntfy import NotificationEvent
|
from platypush.message.event.ntfy import NotificationEvent
|
||||||
from platypush.plugins import RunnablePlugin, action
|
from platypush.plugins import AsyncRunnablePlugin, action
|
||||||
from platypush.context import get_or_create_event_loop
|
|
||||||
|
|
||||||
|
|
||||||
class NtfyPlugin(RunnablePlugin):
|
class NtfyPlugin(AsyncRunnablePlugin):
|
||||||
"""
|
"""
|
||||||
Ntfy integration.
|
Ntfy integration.
|
||||||
|
|
||||||
|
@ -50,7 +47,6 @@ class NtfyPlugin(RunnablePlugin):
|
||||||
)
|
)
|
||||||
|
|
||||||
self._subscriptions = subscriptions or []
|
self._subscriptions = subscriptions or []
|
||||||
self._ws_proc = None
|
|
||||||
|
|
||||||
async def _get_ws_handler(self, url):
|
async def _get_ws_handler(self, url):
|
||||||
reconnect_wait_secs = 1
|
reconnect_wait_secs = 1
|
||||||
|
@ -60,7 +56,7 @@ class NtfyPlugin(RunnablePlugin):
|
||||||
self.logger.debug(f'Connecting to {url}')
|
self.logger.debug(f'Connecting to {url}')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with websockets.connect(url) as ws:
|
async with websockets.connect(url) as ws: # type: ignore
|
||||||
reconnect_wait_secs = 1
|
reconnect_wait_secs = 1
|
||||||
self.logger.info(f'Connected to {url}')
|
self.logger.info(f'Connected to {url}')
|
||||||
async for msg in ws:
|
async for msg in ws:
|
||||||
|
@ -91,45 +87,22 @@ class NtfyPlugin(RunnablePlugin):
|
||||||
)
|
)
|
||||||
except websockets.exceptions.WebSocketException as e:
|
except websockets.exceptions.WebSocketException as e:
|
||||||
self.logger.error('Websocket error: %s', e)
|
self.logger.error('Websocket error: %s', e)
|
||||||
time.sleep(reconnect_wait_secs)
|
await asyncio.sleep(reconnect_wait_secs)
|
||||||
reconnect_wait_secs = min(
|
reconnect_wait_secs = min(
|
||||||
reconnect_wait_secs * 2, reconnect_wait_secs_max
|
reconnect_wait_secs * 2, reconnect_wait_secs_max
|
||||||
)
|
)
|
||||||
|
|
||||||
def _ws_process(self):
|
async def listen(self):
|
||||||
loop = get_or_create_event_loop()
|
return await asyncio.wait(
|
||||||
try:
|
[
|
||||||
loop.run_until_complete(
|
self._get_ws_handler(f'{self._ws_url}/{sub}/ws')
|
||||||
asyncio.wait(
|
for sub in set(self._subscriptions)
|
||||||
{
|
]
|
||||||
self._get_ws_handler(f'{self._ws_url}/{sub}/ws')
|
)
|
||||||
for sub in self._subscriptions
|
|
||||||
}
|
|
||||||
)
|
|
||||||
)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def main(self):
|
@property
|
||||||
if self.should_stop() or (self._ws_proc and self._ws_proc.is_alive()):
|
def _should_start_runner(self):
|
||||||
self.logger.debug('Already connected')
|
return bool(self._subscriptions)
|
||||||
return
|
|
||||||
|
|
||||||
if self._subscriptions:
|
|
||||||
self._ws_proc = multiprocessing.Process(target=self._ws_process)
|
|
||||||
self._ws_proc.start()
|
|
||||||
|
|
||||||
self.wait_stop()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
if self._ws_proc and self._ws_proc.is_alive():
|
|
||||||
self._ws_proc.terminate()
|
|
||||||
try:
|
|
||||||
self._ws_proc.join(timeout=3)
|
|
||||||
except TimeoutError:
|
|
||||||
self._ws_proc.kill()
|
|
||||||
|
|
||||||
super().stop()
|
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def send_message(
|
def send_message(
|
||||||
|
|
Loading…
Reference in a new issue