Simplified backend interface, fixed some bugs with response management

This commit is contained in:
Fabio Manganiello 2017-12-16 04:56:43 +01:00
parent 97f2733308
commit 8c89a10710
5 changed files with 41 additions and 44 deletions

View file

@ -58,6 +58,7 @@ def _init_plugin(plugin_name, reload=False):
def _exec_func(args, backend=None, retry=True):
origin = args.pop('origin') if 'origin' in args else None
action = args.pop('action')
tokens = action.split('.')
module_name = str.join('.', tokens[:-1])
@ -79,14 +80,14 @@ def _exec_func(args, backend=None, retry=True):
response = Response(output=None, errors=[e, traceback.format_exc()])
logging.exception(e)
if retry:
# Put the action back where it was before retrying
args['action'] = action
# Put the popped args back where they were before retrying
args['action'] = action; args['origin'] = origin
logging.info('Reloading plugin {} and retrying'.format(module_name))
_init_plugin(module_name, reload=True)
_exec_func(args, backend, retry=False)
finally:
if backend: backend.send_msg({ 'response':str(response) })
if backend: backend.send_response(origin, response)
def on_msg(msg, backend=None):

View file

@ -1,3 +1,4 @@
import json
import logging
import sys
import platypush
@ -53,6 +54,10 @@ class Backend(Thread):
if target != self.device_id and not self.is_local():
return # Not for me
if 'response' in msg:
logging.info('Received response: {}'.format(msg))
return
if 'action' not in msg:
self.on_error('No action specified: {}'.format(msg))
return
@ -60,7 +65,24 @@ class Backend(Thread):
self.mq.put(msg)
def send_msg(self, msg):
raise NotImplementedError()
if isinstance(msg, str):
msg = json.loads(msg)
if not isinstance(msg, dict):
raise RuntimeError('send_msg expects either a JSON string or ' +
'a dictionary but received {}'.format(type(msg)))
msg['origin'] = self.device_id # To get the response
self._send_msg(msg)
def send_response(self, target, response):
self.send_msg({
'target' : target,
'response' : {
'output' : response.output,
'errors' : response.errors,
}
})
def run(self):
raise NotImplementedError()

View file

@ -9,7 +9,7 @@ class KafkaBackend(Backend):
def _init(self, server, topic):
self.server = server
self.topic_prefix = topic
self.topic = topic + '.' + self.device_id
self.topic = self._topic_by_device_id(self.device_id)
self.producer = None
def _on_record(self, record):
@ -27,18 +27,15 @@ class KafkaBackend(Backend):
if not self.producer:
self.producer = KafkaProducer(bootstrap_servers=self.server)
def send_msg(self, msg):
if isinstance(msg, str):
msg = json.loads(msg)
if isinstance(msg, dict):
def _topic_by_device_id(self, device_id):
return '{}.{}'.format(self.topic_prefix, device_id)
def _send_msg(self, msg):
target = msg['target']
msg = json.dumps(msg).encode('utf-8')
if not isinstance(msg, bytes):
msg = json.dumps(msg)
raise RuntimeError('Invalid non-JSON message')
self._init_producer()
self.producer.send(self.topic_prefix + '.' + target, msg)
self.producer.send(self._topic_by_device_id(target), msg)
self.producer.flush()
def run(self):

View file

@ -8,12 +8,9 @@ from .. import Backend
class LocalBackend(Backend):
def _init(self, fifo):
self.fifo = fifo
def send_msg(self, msg):
if isinstance(msg, dict):
msg = json.dumps(msg)
if not isinstance(msg, str):
raise RuntimeError('Invalid non-JSON message')
def _send_msg(self, msg):
msg = json.dumps(msg)
msglen = len(msg)+1 # Include \n
msg = bytearray((str(msglen) + '\n' + msg + '\n').encode('utf-8'))
with open(self.fifo, 'wb') as f:

View file

@ -12,14 +12,6 @@ class PushbulletBackend(Backend):
self.device_name = device
self.pb_device_id = self.get_device_id()
@staticmethod
def _on_init(ws):
logging.info('Connection opened')
@staticmethod
def _on_close(ws):
logging.info('Connection closed')
@staticmethod
def _on_msg(ws, msg):
ws.backend._on_push(msg)
@ -54,7 +46,6 @@ class PushbulletBackend(Backend):
else:
return {}
def _on_push(self, data):
try:
data = json.loads(data) if isinstance(data, str) else push
@ -82,10 +73,8 @@ class PushbulletBackend(Backend):
def _init_socket(self):
self.ws = websocket.WebSocketApp(
'wss://stream.pushbullet.com/websocket/' + self.token,
on_open = self._on_init,
on_message = self._on_msg,
on_error = self._on_error,
on_close = self._on_close)
on_error = self._on_error)
self.ws.backend = self
@ -104,26 +93,17 @@ class PushbulletBackend(Backend):
return devices[0]['iden']
def send_msg(self, msg):
if isinstance(msg, dict):
msg = json.dumps(msg)
if not isinstance(msg, str):
raise RuntimeError('Invalid non-JSON message')
response = requests.post(
def _send_msg(self, msg):
requests.post(
u'https://api.pushbullet.com/v2/pushes',
headers = { 'Access-Token': self.token },
json = {
'type': 'note',
'device_iden': self.pb_device_id,
'body': msg,
'body': json.dumps(msg),
}
).json()
if 'dismissed' not in response or response['dismissed'] is True:
raise RuntimeError('Error while pushing the message: {}'.
format(response))
def run(self):
self._init_socket()
logging.info('Initialized Pushbullet backend - device_id: {}'