From cff4563dae73b533b598ee9218bb8db437ffa184 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 27 Dec 2018 02:29:44 +0100 Subject: [PATCH] Rewritten Pushbullet backend using pushbullet.py for better stability --- platypush/backend/pushbullet/__init__.py | 233 +++++++---------------- platypush/plugins/pushbullet.py | 1 + requirements.txt | 3 + setup.py | 2 +- 4 files changed, 73 insertions(+), 166 deletions(-) diff --git a/platypush/backend/pushbullet/__init__.py b/platypush/backend/pushbullet/__init__.py index ee85f44e..864d79dd 100644 --- a/platypush/backend/pushbullet/__init__.py +++ b/platypush/backend/pushbullet/__init__.py @@ -1,16 +1,14 @@ -import asyncio import json import requests import time -import websockets +from pushbullet import Pushbullet, Listener + +from platypush.backend import Backend from platypush.config import Config -from platypush.context import get_or_create_event_loop from platypush.message import Message from platypush.message.event.pushbullet import PushbulletEvent -from .. import Backend - class PushbulletBackend(Backend): """ @@ -28,200 +26,100 @@ class PushbulletBackend(Backend): Requires: * **requests** (``pip install requests``) - * **websockets** (``pip install websockets``) + * **pushbullet.py** (``pip install git+https://github.com/rbrcsk/pushbullet.py``) """ - _PUSHBULLET_WS_URL = 'wss://stream.pushbullet.com/websocket/' - _WS_PING_INTERVAL = 10 - - def __init__(self, token, device='Platypush', **kwargs): + def __init__(self, token, device='Platypush', proxy_host=None, + proxy_port=None, **kwargs): """ - :param token: Your Pushbullet API token, see https://docs.pushbullet.com/#authentication + :param token: Your Pushbullet API token, see + https://docs.pushbullet.com/#authentication :type token: str :param device: Name of the virtual device for Platypush (default: Platypush) :type device: str + + :param proxy_host: HTTP proxy host (default: None) + :type proxy_host: str + + :param proxy_port: HTTP proxy port (default: None) + :type proxy_port: int """ super().__init__(**kwargs) self.token = token self.device_name = device + self.proxy_host = proxy_host + self.proxy_port = proxy_port + self.pb = Pushbullet(token) self.pb_device_id = self.get_device_id() - self.ws = None + self.listener = None + self.device = self.pb.get_device(self.device_name) - self._last_received_msg = { - 'request' : { 'body': None, 'time': None }, - 'response' : { 'body': None, 'time': None }, - 'event' : { 'body': None, 'time': None }, - } def _get_latest_push(self): t = int(time.time()) - 5 - try: - response = requests.get( - u'https://api.pushbullet.com/v2/pushes', - headers = { 'Access-Token': self.token }, - params = { - 'modified_after': str(t), - 'active' : 'true', - 'limit' : 1, - } - ) + pushes = self.pb.get_pushes(modified_after=str(t), limit=1) + return pushes[0] - response = response.json() - except Exception as e: - self.logger.exception(e) - raise e - - if 'pushes' in response and response['pushes']: - return response['pushes'][0] - else: - return {} - - def _should_skip_last_received_msg(self, msg): - if not isinstance(msg, dict): return True # We received something weird - - is_duplicate=False - last_msg = self._last_received_msg[msg['type']] - - if last_msg: - msg = Message.parse(msg) - if str(msg) == str(last_msg['body']) \ - and time.time() - last_msg['time'] <= 2: - # Duplicate message sent on the Pushbullet socket within - # two seconds, ignore it - self.logger.debug('Ignoring duplicate message received on the socket') - is_duplicate = True - - self._last_received_msg[msg['type']] = { - 'body': msg, 'time': time.time() - } - - return is_duplicate - - def on_push(self, ws, data): - try: - # Parse the push + def on_push(self): + def callback(data): try: - data = json.loads(data) if isinstance(data, str) else data + # Parse the push + try: + data = json.loads(data) if isinstance(data, str) else data + except Exception as e: + self.logger.exception(e) + return + + # If it's a push, get it + if data['type'] == 'tickle' and data['subtype'] == 'push': + push = self._get_latest_push() + elif data['type'] == 'push': + push = data['push'] + else: return # Not a push notification + + # Post an event, useful to react on mobile notifications if + # you enabled notification mirroring on your PushBullet app + event = PushbulletEvent(**push) + self.on_message(event) + + if 'body' not in push: return + self.logger.debug('Received push: {}'.format(push)) + + body = push['body'] + try: body = json.loads(body) + except ValueError as e: return # Some other non-JSON push + + self.on_message(body) except Exception as e: self.logger.exception(e) return - # If it's a push, get it - if data['type'] == 'tickle' and data['subtype'] == 'push': - push = self._get_latest_push() - elif data['type'] == 'push': - push = data['push'] - else: return # Not a push notification - - # Post an event, useful to react on mobile notifications if - # you enabled notification mirroring on your PushBullet app - event = PushbulletEvent(**push) - self.on_message(event) - - if 'body' not in push: return - self.logger.debug('Received push: {}'.format(push)) - - body = push['body'] - try: body = json.loads(body) - except ValueError as e: return # Some other non-JSON push - - if not self._should_skip_last_received_msg(body): - self.on_message(body) - except Exception as e: - self.logger.exception(e) - return - - def on_error(self, ws, e): - self.logger.exception(e) - - def _init_socket(self): - async def pushbullet_client(): - while True: - try: - self.logger.info('Connecting to Pushbullet websocket URL {}' - .format(self._PUSHBULLET_WS_URL)) - - async with websockets.connect(self._PUSHBULLET_WS_URL + - self.token) as self.ws: - self.logger.info('Connection to Pushbullet successful') - - while True: - try: - push = await self.ws.recv() - except Exception as e: - self.logger.warning('Disconnected from ' + - 'Pushbullet: {}'. - format(str(e))) - break - - self.on_push(self.ws, push) - except Exception as e: - self.logger.exception(e) - - self.close() - loop = None - - loop = get_or_create_event_loop() - loop.run_until_complete(pushbullet_client()) - loop.run_forever() - - - def create_device(self, name): - return requests.post( - u'https://api.pushbullet.com/v2/devices', - headers = { 'Access-Token': self.token }, - json = { - 'nickname': name, - 'model': 'Platypush virtual device', - 'manufactorer': 'platypush', - 'app_version': 8623, - 'icon': 'system', - } - ).json() - + return callback def get_device_id(self): - response = requests.get( - u'https://api.pushbullet.com/v2/devices', - headers = { 'Access-Token': self.token }, - ).json() - - devices = [dev for dev in response['devices'] if 'nickname' in dev - and dev['nickname'] == self.device_name] - - if devices: - return devices[0]['iden'] - try: - response = self.create_device(self.device_name) - if 'iden' not in response: - raise RuntimeError() + return self.pb.get_device(self.device_name).device_iden + except Exception as e: + device = self.pb.new_device(name, model='Platypush virtual device', + manufactorer='platypush', + app_version=8623, icon='system') self.logger.info('Created Pushbullet device {}'.format( self.device_name)) - return response['iden'] - except Exception as e: - self.logger.error('Unable to create Pushbillet device {}'. - format(self.device_name)) + return device.device_iden def send_message(self, msg): - requests.post( - u'https://api.pushbullet.com/v2/pushes', - headers = { 'Access-Token': self.token }, - json = { - 'type': 'note', - 'device_iden': self.pb_device_id, - 'body': str(msg) - } - ).json() + if isinstance(msg, dict): + msg = json.dumps(msg) + self.device.push_note(title=None, body=str(msg)) def close(self): - if self.ws: - self.ws.close() + if self.listener: + self.listener.close() def on_stop(self): return self.close() @@ -231,7 +129,12 @@ class PushbulletBackend(Backend): self.logger.info('Initialized Pushbullet backend - device_id: {}' .format(self.device_name)) - self._init_socket() + + self.listener = Listener(account=self.pb, on_push=self.on_push(), + http_proxy_host=self.proxy_host, + http_proxy_port=self.proxy_port) + + self.listener.run_forever() # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/pushbullet.py b/platypush/plugins/pushbullet.py index 8e4c53e2..70900c14 100644 --- a/platypush/plugins/pushbullet.py +++ b/platypush/plugins/pushbullet.py @@ -15,6 +15,7 @@ class PushbulletPlugin(Plugin): Requires: * **requests** (``pip install requests``) + * The :class:`platypush.backend.pushbullet.Pushbullet` backend enabled """ def __init__(self, *args, **kwargs): diff --git a/requirements.txt b/requirements.txt index 11dc3e81..2c72bcc6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,6 +11,9 @@ pyyaml # Apache Kafka backend support kafka-python +# Pushbullet backend support +git+https://github.com/rbrcsk/pushbullet.py + # HTTP backend support flask websockets diff --git a/setup.py b/setup.py index 5053cced..b3770a71 100755 --- a/setup.py +++ b/setup.py @@ -66,7 +66,7 @@ setup( ], extras_require = { 'Support for Apache Kafka backend': ['kafka-python'], - 'Support for Pushbullet backend': ['requests', 'websockets'], + 'Support for Pushbullet backend': ['requests', 'git+https://github.com/rbrcsk/pushbullet.py'], 'Support for HTTP backend': ['flask','websockets', 'python-dateutil'], 'Support for HTTP poll backend': ['frozendict'], 'Support for database plugin': ['sqlalchemy'],