Rewritten Pushbullet backend using pushbullet.py for better stability

This commit is contained in:
Fabio Manganiello 2018-12-27 02:29:44 +01:00
parent 56ff58d06b
commit cff4563dae
4 changed files with 73 additions and 166 deletions

View file

@ -1,16 +1,14 @@
import asyncio
import json import json
import requests import requests
import time import time
import websockets
from pushbullet import Pushbullet, Listener
from platypush.backend import Backend
from platypush.config import Config from platypush.config import Config
from platypush.context import get_or_create_event_loop
from platypush.message import Message from platypush.message import Message
from platypush.message.event.pushbullet import PushbulletEvent from platypush.message.event.pushbullet import PushbulletEvent
from .. import Backend
class PushbulletBackend(Backend): class PushbulletBackend(Backend):
""" """
@ -28,200 +26,100 @@ class PushbulletBackend(Backend):
Requires: Requires:
* **requests** (``pip install requests``) * **requests** (``pip install requests``)
* **websockets** (``pip install websockets``) * **pushbullet.py** (``pip install git+https://github.com/rbrcsk/pushbullet.py``)
""" """
_PUSHBULLET_WS_URL = 'wss://stream.pushbullet.com/websocket/' def __init__(self, token, device='Platypush', proxy_host=None,
_WS_PING_INTERVAL = 10 proxy_port=None, **kwargs):
def __init__(self, token, device='Platypush', **kwargs):
""" """
:param token: Your Pushbullet API token, see https://docs.pushbullet.com/#authentication :param token: Your Pushbullet API token, see
https://docs.pushbullet.com/#authentication
:type token: str :type token: str
:param device: Name of the virtual device for Platypush (default: Platypush) :param device: Name of the virtual device for Platypush (default: Platypush)
:type device: str :type device: str
:param proxy_host: HTTP proxy host (default: None)
:type proxy_host: str
:param proxy_port: HTTP proxy port (default: None)
:type proxy_port: int
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
self.token = token self.token = token
self.device_name = device self.device_name = device
self.proxy_host = proxy_host
self.proxy_port = proxy_port
self.pb = Pushbullet(token)
self.pb_device_id = self.get_device_id() self.pb_device_id = self.get_device_id()
self.ws = None self.listener = None
self.device = self.pb.get_device(self.device_name)
self._last_received_msg = {
'request' : { 'body': None, 'time': None },
'response' : { 'body': None, 'time': None },
'event' : { 'body': None, 'time': None },
}
def _get_latest_push(self): def _get_latest_push(self):
t = int(time.time()) - 5 t = int(time.time()) - 5
try: pushes = self.pb.get_pushes(modified_after=str(t), limit=1)
response = requests.get( return pushes[0]
u'https://api.pushbullet.com/v2/pushes',
headers = { 'Access-Token': self.token },
params = {
'modified_after': str(t),
'active' : 'true',
'limit' : 1,
}
)
response = response.json() def on_push(self):
except Exception as e: def callback(data):
self.logger.exception(e)
raise e
if 'pushes' in response and response['pushes']:
return response['pushes'][0]
else:
return {}
def _should_skip_last_received_msg(self, msg):
if not isinstance(msg, dict): return True # We received something weird
is_duplicate=False
last_msg = self._last_received_msg[msg['type']]
if last_msg:
msg = Message.parse(msg)
if str(msg) == str(last_msg['body']) \
and time.time() - last_msg['time'] <= 2:
# Duplicate message sent on the Pushbullet socket within
# two seconds, ignore it
self.logger.debug('Ignoring duplicate message received on the socket')
is_duplicate = True
self._last_received_msg[msg['type']] = {
'body': msg, 'time': time.time()
}
return is_duplicate
def on_push(self, ws, data):
try:
# Parse the push
try: try:
data = json.loads(data) if isinstance(data, str) else data # Parse the push
try:
data = json.loads(data) if isinstance(data, str) else data
except Exception as e:
self.logger.exception(e)
return
# If it's a push, get it
if data['type'] == 'tickle' and data['subtype'] == 'push':
push = self._get_latest_push()
elif data['type'] == 'push':
push = data['push']
else: return # Not a push notification
# Post an event, useful to react on mobile notifications if
# you enabled notification mirroring on your PushBullet app
event = PushbulletEvent(**push)
self.on_message(event)
if 'body' not in push: return
self.logger.debug('Received push: {}'.format(push))
body = push['body']
try: body = json.loads(body)
except ValueError as e: return # Some other non-JSON push
self.on_message(body)
except Exception as e: except Exception as e:
self.logger.exception(e) self.logger.exception(e)
return return
# If it's a push, get it return callback
if data['type'] == 'tickle' and data['subtype'] == 'push':
push = self._get_latest_push()
elif data['type'] == 'push':
push = data['push']
else: return # Not a push notification
# Post an event, useful to react on mobile notifications if
# you enabled notification mirroring on your PushBullet app
event = PushbulletEvent(**push)
self.on_message(event)
if 'body' not in push: return
self.logger.debug('Received push: {}'.format(push))
body = push['body']
try: body = json.loads(body)
except ValueError as e: return # Some other non-JSON push
if not self._should_skip_last_received_msg(body):
self.on_message(body)
except Exception as e:
self.logger.exception(e)
return
def on_error(self, ws, e):
self.logger.exception(e)
def _init_socket(self):
async def pushbullet_client():
while True:
try:
self.logger.info('Connecting to Pushbullet websocket URL {}'
.format(self._PUSHBULLET_WS_URL))
async with websockets.connect(self._PUSHBULLET_WS_URL +
self.token) as self.ws:
self.logger.info('Connection to Pushbullet successful')
while True:
try:
push = await self.ws.recv()
except Exception as e:
self.logger.warning('Disconnected from ' +
'Pushbullet: {}'.
format(str(e)))
break
self.on_push(self.ws, push)
except Exception as e:
self.logger.exception(e)
self.close()
loop = None
loop = get_or_create_event_loop()
loop.run_until_complete(pushbullet_client())
loop.run_forever()
def create_device(self, name):
return requests.post(
u'https://api.pushbullet.com/v2/devices',
headers = { 'Access-Token': self.token },
json = {
'nickname': name,
'model': 'Platypush virtual device',
'manufactorer': 'platypush',
'app_version': 8623,
'icon': 'system',
}
).json()
def get_device_id(self): def get_device_id(self):
response = requests.get(
u'https://api.pushbullet.com/v2/devices',
headers = { 'Access-Token': self.token },
).json()
devices = [dev for dev in response['devices'] if 'nickname' in dev
and dev['nickname'] == self.device_name]
if devices:
return devices[0]['iden']
try: try:
response = self.create_device(self.device_name) return self.pb.get_device(self.device_name).device_iden
if 'iden' not in response: except Exception as e:
raise RuntimeError() device = self.pb.new_device(name, model='Platypush virtual device',
manufactorer='platypush',
app_version=8623, icon='system')
self.logger.info('Created Pushbullet device {}'.format( self.logger.info('Created Pushbullet device {}'.format(
self.device_name)) self.device_name))
return response['iden'] return device.device_iden
except Exception as e:
self.logger.error('Unable to create Pushbillet device {}'.
format(self.device_name))
def send_message(self, msg): def send_message(self, msg):
requests.post( if isinstance(msg, dict):
u'https://api.pushbullet.com/v2/pushes', msg = json.dumps(msg)
headers = { 'Access-Token': self.token }, self.device.push_note(title=None, body=str(msg))
json = {
'type': 'note',
'device_iden': self.pb_device_id,
'body': str(msg)
}
).json()
def close(self): def close(self):
if self.ws: if self.listener:
self.ws.close() self.listener.close()
def on_stop(self): def on_stop(self):
return self.close() return self.close()
@ -231,7 +129,12 @@ class PushbulletBackend(Backend):
self.logger.info('Initialized Pushbullet backend - device_id: {}' self.logger.info('Initialized Pushbullet backend - device_id: {}'
.format(self.device_name)) .format(self.device_name))
self._init_socket()
self.listener = Listener(account=self.pb, on_push=self.on_push(),
http_proxy_host=self.proxy_host,
http_proxy_port=self.proxy_port)
self.listener.run_forever()
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -15,6 +15,7 @@ class PushbulletPlugin(Plugin):
Requires: Requires:
* **requests** (``pip install requests``) * **requests** (``pip install requests``)
* The :class:`platypush.backend.pushbullet.Pushbullet` backend enabled
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):

View file

@ -11,6 +11,9 @@ pyyaml
# Apache Kafka backend support # Apache Kafka backend support
kafka-python kafka-python
# Pushbullet backend support
git+https://github.com/rbrcsk/pushbullet.py
# HTTP backend support # HTTP backend support
flask flask
websockets websockets

View file

@ -66,7 +66,7 @@ setup(
], ],
extras_require = { extras_require = {
'Support for Apache Kafka backend': ['kafka-python'], 'Support for Apache Kafka backend': ['kafka-python'],
'Support for Pushbullet backend': ['requests', 'websockets'], 'Support for Pushbullet backend': ['requests', 'git+https://github.com/rbrcsk/pushbullet.py'],
'Support for HTTP backend': ['flask','websockets', 'python-dateutil'], 'Support for HTTP backend': ['flask','websockets', 'python-dateutil'],
'Support for HTTP poll backend': ['frozendict'], 'Support for HTTP poll backend': ['frozendict'],
'Support for database plugin': ['sqlalchemy'], 'Support for database plugin': ['sqlalchemy'],