From ef241b3769bc3225f1f4054d1f5628e7b89cca7b Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 24 Dec 2018 12:31:38 +0100 Subject: [PATCH] More robust Pushbullet reconnection logic --- platypush/backend/pushbullet/__init__.py | 39 +++++++++++++++--------- platypush/context/__init__.py | 5 --- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/platypush/backend/pushbullet/__init__.py b/platypush/backend/pushbullet/__init__.py index 561117d1..c0fe6e21 100644 --- a/platypush/backend/pushbullet/__init__.py +++ b/platypush/backend/pushbullet/__init__.py @@ -5,7 +5,7 @@ import time import websockets from platypush.config import Config -from platypush.context import create_event_loop +from platypush.context import get_or_create_event_loop from platypush.message import Message from platypush.message.event.pushbullet import PushbulletEvent @@ -31,6 +31,9 @@ class PushbulletBackend(Backend): * **websockets** (``pip install websockets``) """ + _PUSHBULLET_WS_URL = 'wss://stream.pushbullet.com/websocket/' + _WS_PING_INTERVAL = 10 + def __init__(self, token, device='Platypush', **kwargs): """ :param token: Your Pushbullet API token, see https://docs.pushbullet.com/#authentication @@ -133,29 +136,37 @@ class PushbulletBackend(Backend): def on_error(self, ws, e): self.logger.exception(e) - self.logger.info('Restarting PushBullet backend') - ws.close() - self._init_socket() def _init_socket(self): async def pushbullet_client(): - async with websockets.connect('wss://stream.pushbullet.com/websocket/' - + self.token) as self.ws: - while True: - try: - push = await self.ws.recv() - except Exception as e: - self.on_error(self.ws, e) - break + while True: + try: + self.logger.info('Connecting to Pushbullet websocket URL {}' + .format(self._PUSHBULLET_WS_URL)) - self.on_push(self.ws, push) + async with websockets.connect(self._PUSHBULLET_WS_URL + + self.token) as self.ws: + self.logger.info('Connection to Pushbullet successful') + + while True: + try: + push = await self.ws.recv() + except Exception as e: + self.logger.exception(e) + break + + self.on_push(self.ws, push) + except Exception as e: + self.logger.exception(e) self.close() + loop = None - loop = create_event_loop() + loop = get_or_create_event_loop() loop.run_until_complete(pushbullet_client()) loop.run_forever() + def create_device(self, name): return requests.post( u'https://api.pushbullet.com/v2/devices', diff --git a/platypush/context/__init__.py b/platypush/context/__init__.py index 8971ad38..6b46449f 100644 --- a/platypush/context/__init__.py +++ b/platypush/context/__init__.py @@ -114,11 +114,6 @@ def register_plugin(name, plugin, **kwargs): """ Registers a plugin instance by name """ global plugins -def create_event_loop(): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - return loop - def get_or_create_event_loop(): try: loop = asyncio.get_event_loop()