diff --git a/platypush/__init__.py b/platypush/__init__.py index 8e4be1ef4c..7ea03fc028 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -58,6 +58,7 @@ def _init_plugin(plugin_name, reload=False): def _exec_func(args, backend=None, retry=True): + origin = args.pop('origin') if 'origin' in args else None action = args.pop('action') tokens = action.split('.') module_name = str.join('.', tokens[:-1]) @@ -79,14 +80,14 @@ def _exec_func(args, backend=None, retry=True): response = Response(output=None, errors=[e, traceback.format_exc()]) logging.exception(e) if retry: - # Put the action back where it was before retrying - args['action'] = action + # Put the popped args back where they were before retrying + args['action'] = action; args['origin'] = origin logging.info('Reloading plugin {} and retrying'.format(module_name)) _init_plugin(module_name, reload=True) _exec_func(args, backend, retry=False) finally: - if backend: backend.send_msg({ 'response':str(response) }) + if backend: backend.send_response(origin, response) def on_msg(msg, backend=None): diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 1e01972b51..5dd4975016 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -1,3 +1,4 @@ +import json import logging import sys import platypush @@ -53,6 +54,10 @@ class Backend(Thread): if target != self.device_id and not self.is_local(): return # Not for me + if 'response' in msg: + logging.info('Received response: {}'.format(msg)) + return + if 'action' not in msg: self.on_error('No action specified: {}'.format(msg)) return @@ -60,7 +65,24 @@ class Backend(Thread): self.mq.put(msg) def send_msg(self, msg): - raise NotImplementedError() + if isinstance(msg, str): + msg = json.loads(msg) + if not isinstance(msg, dict): + raise RuntimeError('send_msg expects either a JSON string or ' + + 'a dictionary but received {}'.format(type(msg))) + + msg['origin'] = self.device_id # To get the response + + self._send_msg(msg) + + def send_response(self, target, response): + self.send_msg({ + 'target' : target, + 'response' : { + 'output' : response.output, + 'errors' : response.errors, + } + }) def run(self): raise NotImplementedError() diff --git a/platypush/backend/kafka/__init__.py b/platypush/backend/kafka/__init__.py index c494351e5f..bf4db05760 100644 --- a/platypush/backend/kafka/__init__.py +++ b/platypush/backend/kafka/__init__.py @@ -9,7 +9,7 @@ class KafkaBackend(Backend): def _init(self, server, topic): self.server = server self.topic_prefix = topic - self.topic = topic + '.' + self.device_id + self.topic = self._topic_by_device_id(self.device_id) self.producer = None def _on_record(self, record): @@ -27,18 +27,15 @@ class KafkaBackend(Backend): if not self.producer: self.producer = KafkaProducer(bootstrap_servers=self.server) - def send_msg(self, msg): - if isinstance(msg, str): - msg = json.loads(msg) - if isinstance(msg, dict): - target = msg['target'] - msg = json.dumps(msg).encode('utf-8') - if not isinstance(msg, bytes): - msg = json.dumps(msg) - raise RuntimeError('Invalid non-JSON message') + def _topic_by_device_id(self, device_id): + return '{}.{}'.format(self.topic_prefix, device_id) + + def _send_msg(self, msg): + target = msg['target'] + msg = json.dumps(msg).encode('utf-8') self._init_producer() - self.producer.send(self.topic_prefix + '.' + target, msg) + self.producer.send(self._topic_by_device_id(target), msg) self.producer.flush() def run(self): diff --git a/platypush/backend/local/__init__.py b/platypush/backend/local/__init__.py index 9cba0581eb..85a2818768 100644 --- a/platypush/backend/local/__init__.py +++ b/platypush/backend/local/__init__.py @@ -8,12 +8,9 @@ from .. import Backend class LocalBackend(Backend): def _init(self, fifo): self.fifo = fifo - def send_msg(self, msg): - if isinstance(msg, dict): - msg = json.dumps(msg) - if not isinstance(msg, str): - raise RuntimeError('Invalid non-JSON message') + def _send_msg(self, msg): + msg = json.dumps(msg) msglen = len(msg)+1 # Include \n msg = bytearray((str(msglen) + '\n' + msg + '\n').encode('utf-8')) with open(self.fifo, 'wb') as f: diff --git a/platypush/backend/pushbullet/__init__.py b/platypush/backend/pushbullet/__init__.py index b15a9b6397..6cd6d8f3d2 100644 --- a/platypush/backend/pushbullet/__init__.py +++ b/platypush/backend/pushbullet/__init__.py @@ -12,14 +12,6 @@ class PushbulletBackend(Backend): self.device_name = device self.pb_device_id = self.get_device_id() - @staticmethod - def _on_init(ws): - logging.info('Connection opened') - - @staticmethod - def _on_close(ws): - logging.info('Connection closed') - @staticmethod def _on_msg(ws, msg): ws.backend._on_push(msg) @@ -54,7 +46,6 @@ class PushbulletBackend(Backend): else: return {} - def _on_push(self, data): try: data = json.loads(data) if isinstance(data, str) else push @@ -82,10 +73,8 @@ class PushbulletBackend(Backend): def _init_socket(self): self.ws = websocket.WebSocketApp( 'wss://stream.pushbullet.com/websocket/' + self.token, - on_open = self._on_init, on_message = self._on_msg, - on_error = self._on_error, - on_close = self._on_close) + on_error = self._on_error) self.ws.backend = self @@ -104,26 +93,17 @@ class PushbulletBackend(Backend): return devices[0]['iden'] - def send_msg(self, msg): - if isinstance(msg, dict): - msg = json.dumps(msg) - if not isinstance(msg, str): - raise RuntimeError('Invalid non-JSON message') - - response = requests.post( + def _send_msg(self, msg): + requests.post( u'https://api.pushbullet.com/v2/pushes', headers = { 'Access-Token': self.token }, json = { 'type': 'note', 'device_iden': self.pb_device_id, - 'body': msg, + 'body': json.dumps(msg), } ).json() - if 'dismissed' not in response or response['dismissed'] is True: - raise RuntimeError('Error while pushing the message: {}'. - format(response)) - def run(self): self._init_socket() logging.info('Initialized Pushbullet backend - device_id: {}'