From bd5c80175f0eace23951e88de88a87438205d039 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 20 Dec 2017 20:25:08 +0100 Subject: [PATCH] - Major refactoring. - More consistent naming for many methods, plus added a more extensive doc. - Refactored the entry points for the daemon and the pusher into two classes, easier to encapsulate and wrap into tests. - Removed the local backend - managing the concurrency of two processes reading and writing on the same socket at the same time was too much, and its utility outside of the tests (which can have mock backends as well) is quite modest. - Managing stop events on the bus. Still some work to do tho. - Fixed several bugs. --- README.md | 12 +- platypush/__init__.py | 178 +++++++++++++------ platypush/backend/__init__.py | 82 +++++---- platypush/backend/kafka/__init__.py | 30 +++- platypush/backend/local/__init__.py | 48 ----- platypush/backend/pushbullet/__init__.py | 64 +++---- platypush/bus/__init__.py | 42 +++-- platypush/config.example.yaml | 3 - platypush/message/__init__.py | 2 +- platypush/message/event/__init__.py | 112 ++++++++++++ platypush/message/request/__init__.py | 2 +- platypush/message/response/__init__.py | 2 +- platypush/plugins/shell/__init__.py | 6 +- platypush/pusher/__init__.py | 212 ++++++++++++++++------- platypush/utils/__init__.py | 64 ++++++- 15 files changed, 569 insertions(+), 290 deletions(-) delete mode 100644 platypush/backend/local/__init__.py create mode 100644 platypush/message/event/__init__.py diff --git a/README.md b/README.md index f435fa8065..af22d8df0e 100644 --- a/README.md +++ b/README.md @@ -35,10 +35,6 @@ Edit the file to include: * The host and port of the Kafka installation * The topic that will be used to deliver and process messages -### For the local socket backend - -* The name of the local FIFO that will be used to deliver and process messages - ### device_id Each target device is identified by a unique device_id in the messages sent over your account. The device_id is the hostname by default, unless changed in config.yaml. @@ -161,9 +157,9 @@ You can also write your own backends, where a backend is nothing but a thread th 5. The configuration for your module will be read from a section named `backend.voicemail` from your `config.yaml`. Its values will be passed over the backend constructor arguments. -6. Implement the `run` method. Since a backend is a thread that polls for new messages on a channel, this will be the thread main method. `_send_msg` should call `self.on_msg` at the end to post a new message to the application. +6. Implement the `run` method. Since a backend is a thread that polls for new messages on a channel, this will be the thread main method. `send_message` should call `self.on_message` at the end to post a new message to the application. -7. Implement the `_send_msg` method. This method will be called whenever the application needs to send a new message through `send_request` and `send_response`. You should never call `_send_msg` directly. +7. Implement the `send_message` method. This method will be called whenever the application needs to send a new message through `send_request` and `send_response`. You should never call `send_message` directly. The `__init__.py` will look like this: @@ -176,12 +172,12 @@ class VoicemailBackend(Backend) self.phone = phone self.voicemail = Voicemail(...) - def _send_msg(self, msg): + def send_message(self, msg): self.voicemail.save_msg(msg) def run(self): while True: msg = self.voicemail.poll() - self.on_msg(msg) + self.on_message(msg) ``` diff --git a/platypush/__init__.py b/platypush/__init__.py index 903a4360a3..af0e3d524b 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -1,3 +1,4 @@ +import argparse import logging import sys import traceback @@ -7,7 +8,8 @@ from getopt import getopt from .bus import Bus from .config import Config -from .utils import get_or_load_plugin, init_backends +from .utils import get_or_load_plugin, init_backends, get_module_and_name_from_action +from .message.event import Event, StopEvent from .message.request import Request from .message.response import Response @@ -16,72 +18,136 @@ __version__ = '0.4' #-----------# -def _execute_request(request, retry=True): - tokens = request.action.split('.') - module_name = str.join('.', tokens[:-1]) - method_name = tokens[-1:][0] +class Daemon(object): + """ Main class for the Platypush daemon """ - try: - plugin = get_or_load_plugin(module_name) - except RuntimeError as e: # Module/class not found - logging.exception(e) - return + """ Configuration file (default: either ~/.config/platypush/config.yaml or + /etc/platypush/config.yaml) """ + config_file = None + + """ Application bus. It's an internal queue where: + - backends will post the messages they receive + - plugins will post the responses they process """ + bus = None + + """ backend_name => backend_obj map """ + backends = None + + """ number of executions retries before a request fails """ + n_tries = 2 + + def __init__(self, config_file=None): + """ Constructor + Params: + config_file -- Configuration file override (default: None) + """ + + self.config_file = config_file + Config.init(self.config_file) + logging.basicConfig(level=Config.get('logging'), stream=sys.stdout) + + @classmethod + def build_from_cmdline(cls, args): + """ Build the app from command line arguments. + Params: + args -- Your sys.argv[1:] [List of strings] + """ + parser = argparse.ArgumentParser() + parser.add_argument('--config', '-c', dest='config', required=False, + default=None, help=cls.config_file.__doc__) + + opts, args = parser.parse_known_args(args) + return cls(config_file=opts.config) + + def on_message(self): + """ Default message handler """ + + def _f(msg): + """ on_message closure + Params: + msg -- platypush.message.Message instance """ + if isinstance(msg, Request): + logging.info('Processing request: {}'.format(msg)) + Thread(target=self.run_request(), args=(msg,)).start() + elif isinstance(msg, Response): + logging.info('Received response: {}'.format(msg)) + + return _f + + def send_response(self, request, response): + """ Sends a response back. + Params: + request -- The platypush.message.request.Request object + response -- The platypush.message.response.Response object """ - try: - response = plugin.run(method=method_name, **request.args) - if response and response.is_error(): - logging.warn('Response processed with errors: {}'.format(response)) - except Exception as e: - response = Response(output=None, errors=[str(e), traceback.format_exc()]) - logging.exception(e) - if retry: - logging.info('Reloading plugin {} and retrying'.format(module_name)) - get_or_load_plugin(module_name, reload=True) - _execute_request(request, retry=False) - finally: - # Send the response on the backend that received the request if request.backend and request.origin: if request.id: response.id = request.id response.target = request.origin logging.info('Processing response: {}'.format(response)) request.backend.send_response(response) + else: + logging.info('Ignoring response as the request has no backend: ' + .format(request)) + + def run_request(self): + """ Runs a request and returns the response """ + def _thread_func(request, n_tries=self.n_tries): + """ Thread closure method + Params: + request - platypush.message.request.Request object """ + + (module_name, method_name) = get_module_and_name_from_action(request.action) + + try: + plugin = get_or_load_plugin(module_name) + except RuntimeError as e: # Module/class not found + logging.exception(e) + return + + try: + # Run the action + response = plugin.run(method=method_name, **request.args) + if response and response.is_error(): + logging.warning('Response processed with errors: {}'.format(response)) + except Exception as e: # Retry mechanism + response = Response(output=None, errors=[str(e), traceback.format_exc()]) + logging.exception(e) + if n_tries: + logging.info('Reloading plugin {} and retrying'.format(module_name)) + get_or_load_plugin(module_name, reload=True) + _thread_func(request, n_tries=n_tries-1) + finally: + # Send the response on the backend that received the request + self.send_response(request, response) + + return _thread_func + + def start(self): + """ Start the daemon """ + self.bus = Bus(on_message=self.on_message()) + + # Initialize the backends and link them to the bus + self.backends = init_backends(self.bus) + + # Start the backend threads + for backend in self.backends.values(): + backend.start() + + # Poll for messages on the bus + try: + self.bus.poll() + except KeyboardInterrupt as e: + logging.info('SIGINT received, terminating application') + + for backend in self.backends.values(): + backend.stop() -def on_msg(msg): - if isinstance(msg, Request): - logging.info('Processing request: {}'.format(msg)) - Thread(target=_execute_request, args=(msg,)).start() - elif isinstance(msg, Response): - logging.info('Received response: {}'.format(msg)) - - -def main(): +def main(args=sys.argv[1:]): print('Starting platypush v.{}'.format(__version__)) - config_file = None - - optlist, args = getopt(sys.argv[1:], 'vh') - for opt, arg in optlist: - if opt == '-c': - config_file = arg - elif opt == '-h': - print(''' -Usage: {} [-h] [-c ] - -h Show this help - -c Path to the configuration file (default: ./config.yaml) -'''.format(sys.argv[0])) - return - - Config.init(config_file) - logging.basicConfig(level=Config.get('logging'), stream=sys.stdout) - - bus = Bus(on_msg=on_msg) - backends = init_backends(bus) - - for backend in backends.values(): - backend.start() - - bus.loop_forever() + app = Daemon.build_from_cmdline(args) + app.start() if __name__ == '__main__': main() diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index a8e3361bd7..3d4dac4273 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -1,12 +1,15 @@ import importlib import logging import sys +import threading from threading import Thread from platypush.bus import Bus from platypush.config import Config +from platypush.utils import get_message_class_by_type from platypush.message import Message +from platypush.message.event import Event, StopEvent from platypush.message.request import Request from platypush.message.response import Response @@ -25,43 +28,16 @@ class Backend(Thread): # the received messages will be pushed self.bus = bus if bus else Bus() self.device_id = Config.get('device_id') - self.msgtypes = {} + self.thread_id = None + self._stop = False Thread.__init__(self) logging.basicConfig(stream=sys.stdout, level=Config.get('logging') if 'logging' not in kwargs else getattr(logging, kwargs['logging'])) - def is_local(self): - """ Returns true if this is a local backend """ - from platypush.backend.local import LocalBackend - return isinstance(self, LocalBackend) - def _get_msgtype_class(self, msgtype): - """ Gets the class of a message type """ - - if msgtype in self.msgtypes: return self.msgtypes[msgtype] - - try: - module = importlib.import_module('platypush.message.' + msgtype) - except ModuleNotFoundError as e: - logging.warn('Unsupported message type {}'.format(msgtype)) - raise RuntimeError(e) - - cls_name = msgtype[0].upper() + msgtype[1:] - - try: - msgclass = getattr(module, cls_name) - self.msgtypes[msgtype] = msgclass - except AttributeError as e: - logging.warn('No such class in {}: {}'.format( - module.__name__, cls_name)) - raise RuntimeError(e) - - return msgclass - - - def on_msg(self, msg): + def on_message(self, msg): """ Callback when a message is received on the backend. It parses and posts the message on the main bus. @@ -76,18 +52,26 @@ class Backend(Thread): msg = Message.parse(msg) if 'type' not in msg: - logging.warn('Ignoring message with no type: {}'.format(msg)) + logging.warning('Ignoring message with no type: {}'.format(msg)) return - msgtype = self._get_msgtype_class(msg['type']) + msgtype = get_message_class_by_type(msg['type']) msg = msgtype.build(msg) - if not getattr(msg, 'target') or (msg.target != self.device_id and not self.is_local()): + if not getattr(msg, 'target') or msg.target != self.device_id: return # Not for me - logging.debug('Message received on the backend: {}'.format(msg)) + logging.info('Message received on the {} backend: {}'.format( + self.__class__.__name__, msg)) + msg.backend = self # Augment message - self.bus.post(msg) + + if isinstance(msg, StopEvent) and msg.targets_me(): + logging.info('Received STOP event on the {} backend: {}'.format( + self.__class__.__name__, msg)) + self._stop = True + else: + self.bus.post(msg) def send_request(self, request): """ @@ -101,7 +85,7 @@ class Backend(Thread): assert isinstance(request, Request) request.origin = self.device_id - self._send_msg(request) + self.send_message(request) def send_response(self, response): """ @@ -115,27 +99,39 @@ class Backend(Thread): assert isinstance(response, Response) response.origin = self.device_id - self._send_msg(response) + self.send_message(response) - def _send_msg(self, msg): + + def send_message(self, msg): """ Sends a platypush.message.Message to a node. To be implemented in the derived classes. - Always call send_request or send_response instead of _send_msg directly + Always call send_request or send_response instead of send_message directly Param: msg -- The message """ - raise NotImplementedError("_send_msg should be implemented in a derived class") + raise NotImplementedError("send_message should be implemented in a derived class") def run(self): """ Starts the backend thread. To be implemented in the derived classes """ - raise NotImplementedError("run should be implemented in a derived class") + self.thread_id = threading.get_ident() + + def on_stop(self): + """ Callback invoked when the process stops """ + pass def stop(self): - """ Stops the backend thread (default: do nothing) """ - pass + """ Stops the backend thread by sending a STOP event on its bus """ + evt = StopEvent(target=self.device_id, origin=self.device_id, + thread_id=self.thread_id) + + self.send_message(evt) + self.on_stop() + + def should_stop(self): + return self._stop # vim:sw=4:ts=4:et: diff --git a/platypush/backend/kafka/__init__.py b/platypush/backend/kafka/__init__.py index bb8d834193..91acb836ad 100644 --- a/platypush/backend/kafka/__init__.py +++ b/platypush/backend/kafka/__init__.py @@ -1,11 +1,14 @@ import logging import json +import time from kafka import KafkaConsumer, KafkaProducer from .. import Backend class KafkaBackend(Backend): + _conn_retry_secs = 5 + def __init__(self, server, topic, **kwargs): super().__init__(**kwargs) @@ -23,7 +26,7 @@ class KafkaBackend(Backend): logging.exception(e) logging.debug('Received message: {}'.format(msg)) - self.on_msg(msg) + self.on_message(msg) def _init_producer(self): if not self.producer: @@ -32,7 +35,7 @@ class KafkaBackend(Backend): def _topic_by_device_id(self, device_id): return '{}.{}'.format(self.topic_prefix, device_id) - def _send_msg(self, msg): + def send_message(self, msg): target = msg.target msg = str(msg).encode('utf-8') @@ -40,13 +43,32 @@ class KafkaBackend(Backend): self.producer.send(self._topic_by_device_id(target), msg) self.producer.flush() + def on_stop(self): + try: + if self.producer: + self.producer.flush() + self.producer.close() + + if self.consumer: + self.consumer.close() + except: pass + def run(self): + super().run() + self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server) logging.info('Initialized kafka backend - server: {}, topic: {}' .format(self.server, self.topic)) - for msg in self.consumer: - self._on_record(msg) + try: + for msg in self.consumer: + self._on_record(msg) + if self.should_stop(): + break + except ConnectionError: + logging.warning('Kafka connection error, retrying in {} seconds'. + format(self._conn_retry_secs)) + time.sleep(self._conn_retry_secs) # vim:sw=4:ts=4:et: diff --git a/platypush/backend/local/__init__.py b/platypush/backend/local/__init__.py deleted file mode 100644 index 6de045e5cd..0000000000 --- a/platypush/backend/local/__init__.py +++ /dev/null @@ -1,48 +0,0 @@ -import logging -import json -import os -import time - -from .. import Backend - -class LocalBackend(Backend): - def __init__(self, fifo, **kwargs): - super().__init__(**kwargs) - - self.fifo = fifo - - def _send_msg(self, msg): - msglen = len(str(msg))+1 # Include \n - - # Message format: b"\n\n" - msg = bytearray((str(msglen) + '\n' + str(msg) + '\n').encode('utf-8')) - with open(self.fifo, 'wb') as f: - f.write(msg) - - def run(self): - try: os.mkfifo(self.fifo) - except FileExistsError as e: pass - logging.info('Initialized local backend on fifo {}'.format(self.fifo)) - - with open(self.fifo, 'rb', 0) as f: - while True: - try: - msglen = int(f.readline()) - except ValueError as e: - time.sleep(0.1) - continue - - msg = f.read(msglen-1) - if not msg: continue - - try: - msg = json.loads(msg.decode('utf-8')) - except Exception as e: - logging.exception(e) - continue - - logging.debug('Received message: {}'.format(msg)) - self.on_msg(msg) - -# vim:sw=4:ts=4:et: - diff --git a/platypush/backend/pushbullet/__init__.py b/platypush/backend/pushbullet/__init__.py index cddf5a8d96..9ec630d9e9 100644 --- a/platypush/backend/pushbullet/__init__.py +++ b/platypush/backend/pushbullet/__init__.py @@ -18,8 +18,9 @@ class PushbulletBackend(Backend): self.pb_device_id = self.get_device_id() self._last_received_msg = { - 'request': { 'body': None, 'time': None }, - 'response': { 'body': None, 'time': None }, + 'request' : { 'body': None, 'time': None }, + 'response' : { 'body': None, 'time': None }, + 'event' : { 'body': None, 'time': None }, } def _get_latest_push(self): @@ -64,51 +65,49 @@ class PushbulletBackend(Backend): return is_duplicate - @staticmethod - def _on_msg(backend): + def on_push(self): def _f(ws, data): try: - data = json.loads(data) if isinstance(data, str) else push + try: + data = json.loads(data) if isinstance(data, str) else push + except Exception as e: + logging.exception(e) + return + + if data['type'] == 'tickle' and data['subtype'] == 'push': + push = self._get_latest_push() + elif data['type'] == 'push': + push = data['push'] + else: return # Not a push notification + + logging.debug('Received push: {}'.format(push)) + + body = push['body'] + try: body = json.loads(body) + except ValueError as e: return # Some other non-JSON push + + if not self._should_skip_last_received_msg(body): + self.on_message(body) except Exception as e: logging.exception(e) return - if data['type'] == 'tickle' and data['subtype'] == 'push': - push = backend._get_latest_push() - elif data['type'] == 'push': - push = data['push'] - else: - return # Not a push notification - - logging.debug('Received push: {}'.format(push)) - - if 'body' not in push: return - - body = push['body'] - try: body = json.loads(body) - except ValueError as e: return - - if not backend._should_skip_last_received_msg(body): - backend.on_msg(body) - return _f - @staticmethod - def _on_error(backend): + def on_error(self): def _f(ws, e): logging.exception(e) logging.info('Restarting PushBullet backend') ws.close() - backend._init_socket() + self._init_socket() return _f def _init_socket(self): self.ws = websocket.WebSocketApp( 'wss://stream.pushbullet.com/websocket/' + self.token, - # on_message = self._on_msg, - on_message = self._on_msg(self), - on_error = self._on_error(self)) + on_message = self.on_push(), + on_error = self.on_error()) def get_device_id(self): response = requests.get( @@ -125,7 +124,7 @@ class PushbulletBackend(Backend): return devices[0]['iden'] - def _send_msg(self, msg): + def send_message(self, msg): requests.post( u'https://api.pushbullet.com/v2/pushes', headers = { 'Access-Token': self.token }, @@ -136,7 +135,12 @@ class PushbulletBackend(Backend): } ).json() + def on_stop(self): + self.ws.close() + def run(self): + super().run() + self._init_socket() logging.info('Initialized Pushbullet backend - device_id: {}' .format(self.device_name)) diff --git a/platypush/bus/__init__.py b/platypush/bus/__init__.py index 52010c17fd..c2c53ce576 100644 --- a/platypush/bus/__init__.py +++ b/platypush/bus/__init__.py @@ -3,18 +3,17 @@ import sys import signal import logging +from enum import Enum from queue import Queue +from platypush.message.event import Event, StopEvent + class Bus(object): """ Main local bus where the daemon will listen for new messages """ - """ Number of seconds to wait for any pending threads - before the process returns to the OS """ - _kill_sec_timeout = 5 - - def __init__(self, on_msg=None): + def __init__(self, on_message=None): self.bus = Queue() - self.on_msg = on_msg + self.on_message = on_message def post(self, msg): """ Sends a message to the bus """ @@ -24,25 +23,24 @@ class Bus(object): """ Reads one message from the bus """ return self.bus.get() - def loop_forever(self): - """ Reads messages from the bus until KeyboardInterrupt """ - def _on_stop_timeout(signum, frame): - logging.warn('Stopping all the active threads after waiting for ' + - '{} seconds'.format(self._kill_sec_timeout)) - os._exit(1) + def poll(self): + """ + Reads messages from the bus until either stop event message or KeyboardInterrupt + """ - if not self.on_msg: return + if not self.on_message: + logging.warning('No message handlers installed, cannot poll') + return - while True: - try: - self.on_msg(self.get()) - except KeyboardInterrupt: - logging.info('Received keyboard interrupt ' + - '- terminating application') + stop=False + while not stop: + msg = self.get() + self.on_message(msg) + + if isinstance(msg, StopEvent) and msg.targets_me(): + logging.info('Received STOP event') + stop=True - signal.signal(signal.SIGALRM, _on_stop_timeout) - signal.alarm(self._kill_sec_timeout) - sys.exit(0) # vim:sw=4:ts=4:et: diff --git a/platypush/config.example.yaml b/platypush/config.example.yaml index 932f4837c5..706ca35aaf 100644 --- a/platypush/config.example.yaml +++ b/platypush/config.example.yaml @@ -11,9 +11,6 @@ backend.pushbullet: token: your_pushbullet_token_here device: your_pushbullet_virtual_device_name -backend.local: - fifo: /tmp/platypush.fifo - # device_id: (default: current hostname) # debug: True (default: False) diff --git a/platypush/message/__init__.py b/platypush/message/__init__.py index 0dc2bcf16c..1427aa5320 100644 --- a/platypush/message/__init__.py +++ b/platypush/message/__init__.py @@ -38,7 +38,7 @@ class Message(object): if isinstance(msg, bytes) or isinstance(msg, bytearray): msg = msg.decode('utf-8') if isinstance(msg, str): - msg = json.loads(msg) + msg = json.loads(msg.strip()) assert isinstance(msg, dict) return msg diff --git a/platypush/message/event/__init__.py b/platypush/message/event/__init__.py new file mode 100644 index 0000000000..973bc5a853 --- /dev/null +++ b/platypush/message/event/__init__.py @@ -0,0 +1,112 @@ +import json +import random +import threading + +from enum import Enum + +from platypush.message import Message + +class Event(Message): + """ Event message class """ + + def __init__(self, target, type, origin, id=None, **kwargs): + """ + Params: + target -- Target node [String] + type -- Event type [EventType] + origin -- Origin node (default: current node) [String] + id -- Event ID (default: auto-generated) + kwargs -- Additional arguments for the event [kwDict] + """ + + self.id = id if id else self._generate_id() + self.target = target + self.origin = origin + self.type = type + self.args = kwargs + + @classmethod + def build(cls, msg): + """ Builds an event message from a JSON UTF-8 string/bytearray, a + dictionary, or another Event """ + + msg = super().parse(msg) + event_type = msg['args'].pop('type') + event_class = getattr(EventType, event_type).cls + + args = { + 'target' : msg['target'], + 'origin' : msg['origin'], + **(msg['args'] if 'args' in msg else {}), + } + + args['id'] = msg['id'] if 'id' in msg else cls._generate_id() + return event_class(**args) + + @staticmethod + def _generate_id(): + """ Generate a unique event ID """ + id = '' + for i in range(0,16): + id += '%.2x' % random.randint(0, 255) + return id + + def __str__(self): + """ + Overrides the str() operator and converts + the message into a UTF-8 JSON string + """ + + return json.dumps({ + 'type' : 'event', + 'target' : self.target, + 'origin' : self.origin if hasattr(self, 'origin') else None, + 'id' : self.id if hasattr(self, 'id') else None, + 'args' : { + 'type' : self.type.name, + **self.args, + }, + }) + + +class StopEvent(Event): + """ StopEvent message. When received on a Bus, it will terminate the + listening thread having the specified ID. Useful to keep listeners in + sync and make them quit when the application terminates """ + + def __init__(self, target, origin, thread_id, id=None, **kwargs): + """ Constructor. + Params: + target -- Target node + origin -- Origin node + thread_id -- thread_iden() to be terminated if listening on the bus + id -- Event ID (default: auto-generated) + kwargs -- Extra key-value arguments + """ + + super().__init__(target=target, origin=origin, id=id, + type=EventType.STOP, thread_id=thread_id, **kwargs) + + def targets_me(self): + """ Returns true if the stop event is for the current thread """ + return self.args['thread_id'] == threading.get_ident() + + +class EventType(Enum): + """ Event types enum """ + + def __new__(cls, *args, **kwds): + value = len(cls.__members__) + 1 + obj = object.__new__(cls) + obj._value_ = value + return obj + + def __init__(self, label, cls): + self.label = label + self.cls = cls + + STOP = 'STOP', StopEvent + + +# vim:sw=4:ts=4:et: + diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index f0083c3241..464f12770c 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -33,7 +33,7 @@ class Request(Message): args['id'] = msg['id'] if 'id' in msg else cls._generate_id() if 'origin' in msg: args['origin'] = msg['origin'] - return Request(**args) + return cls(**args) @staticmethod def _generate_id(): diff --git a/platypush/message/response/__init__.py b/platypush/message/response/__init__.py index a7156eff63..3f865bb262 100644 --- a/platypush/message/response/__init__.py +++ b/platypush/message/response/__init__.py @@ -36,7 +36,7 @@ class Response(Message): if 'id' in msg: args['id'] = msg['id'] if 'origin' in msg: args['origin'] = msg['origin'] - return Response(**args) + return cls(**args) def __str__(self): diff --git a/platypush/plugins/shell/__init__.py b/platypush/plugins/shell/__init__.py index f0b09a7f2a..27bc8d098d 100644 --- a/platypush/plugins/shell/__init__.py +++ b/platypush/plugins/shell/__init__.py @@ -7,15 +7,15 @@ from .. import Plugin class ShellPlugin(Plugin): def exec(self, cmd): output = None - error = None + errors = [] try: output = subprocess.check_output( cmd, stderr=subprocess.STDOUT, shell=True).decode('utf-8') except subprocess.CalledProcessError as e: - error = e.output.decode('utf-8') + errors = [e.output.decode('utf-8')] - return Response(output=output, errors=[error]) + return Response(output=output, errors=errors) # vim:sw=4:ts=4:et: diff --git a/platypush/pusher/__init__.py b/platypush/pusher/__init__.py index a0c45b1842..fca8dde353 100755 --- a/platypush/pusher/__init__.py +++ b/platypush/pusher/__init__.py @@ -1,100 +1,180 @@ import argparse import os import re -import signal import sys from platypush.bus import Bus from platypush.config import Config from platypush.message.request import Request from platypush.message.response import Response -from platypush.utils import init_backends +from platypush.utils import init_backends, set_timeout, clear_timeout -_DEFAULT_TIMEOUT_SEC=5 +class Pusher(object): + """ + Main class to send messages and events to a node + """ -def pusher(target, action, backend=None, config=None, - timeout=_DEFAULT_TIMEOUT_SEC, **kwargs): - def on_timeout(signum, frame): - raise RuntimeError('Response timed out after {} seconds'.format( - timeout)) - os._exit(0) + """ Configuration file path """ + config_file = None - Config.init(config) + """ Default backend name """ + backend = None - if target == 'localhost': - backend = 'local' - elif not backend: - backend = Config.get_default_pusher_backend() + """ Pusher local bus. The response will be processed here """ + bus = None - req = Request.build({ - 'target' : target, - 'action' : action, - 'args' : kwargs, - }) + """ Configured backends as a name => object map """ + backends = {} - bus = Bus() - backends = init_backends(bus=bus) - if backend not in backends: - raise RuntimeError('No such backend configured: {}'.format(backend)) + """ Default response_wait timeout """ + default_response_wait_timeout = 5 - b = backends[backend] - b.start() - b.send_request(req) - if timeout: - signal.signal(signal.SIGALRM, on_timeout) - signal.alarm(timeout) + def __init__(self, config_file=None, backend=None, on_response=None): + """ + Constructor. + Params: + config_file -- Path to the configuration file - default: + ~/.config/platypush/config.yaml or + /etc/platypush/config.yaml) + backend -- Name of the backend where pusher will send the + request and wait for the response (kafka + or pushbullet). Default: whatever is specified + with pusher=true in your configuration file + on_response -- Method that will be invoked upon response receipt. + Takes a platypush.message.response.Response as arg. + Default: print the response and exit. + """ + # Initialize the configuration + self.config_file = config_file + Config.init(config_file) + + self.on_response = on_response or self.default_on_response() + self.backend = backend or Config.get_default_pusher_backend() + self.bus = Bus() + + + @classmethod + def parse_build_args(cls, args): + """ Parse the recognized options from a list of cmdline arguments """ + parser = argparse.ArgumentParser() + parser.add_argument('--config', '-c', dest='config', required=False, + default=None, help="Configuration file path (default: " + + "~/.config/platypush/config.yaml or " + + "/etc/platypush/config.yaml") + + parser.add_argument('--target', '-t', dest='target', required=True, + help="Destination of the command") + + parser.add_argument('--action', '-a', dest='action', required=True, + help="Action to execute, as package.method") + + parser.add_argument('--backend', '-b', dest='backend', required=False, + default=None, help="Backend to deliver the message " + + "[pushbullet|kafka] (default: whatever " + + "specified in your config with pusher=True)") + + parser.add_argument('--timeout', '-T', dest='timeout', required=False, + default=cls.default_response_wait_timeout, help="The application " + + "will wait for a response for this number of seconds " + + "(default: " + str(cls.default_response_wait_timeout) + " seconds. " + "A zero value means that the application " + + " will exit without waiting for a response)") + + opts, args = parser.parse_known_args(args) + + if len(args) % 2 != 0: + raise RuntimeError('Odd number of key-value options passed: {}'.format(args)) + + opts.args = {} + for i in range(0, len(args), 2): + opts.args[re.sub('^-+', '', args[i])] = args[i+1] + + return opts + + def get_backend(self, name): + # Lazy init + if not self.backends: self.backends = init_backends(bus=self.bus) + if name not in self.backends: + raise RuntimeError('No such backend configured: {}'.format(name)) + return self.backends[name] + + def on_timeout(self): + """ Default response timeout handle: raise RuntimeError """ + def _f(): + raise RuntimeError('Response timed out') + return _f + + def default_on_response(self): + def _f(response): + print('Received response: {}'.format(response)) + os._exit(0) + return _f + + def response_wait(self, request, timeout): + # Install the timeout handler + set_timeout(seconds=timeout, on_timeout=self.on_timeout()) + + # Loop on the bus until you get a response for your request ID response_received = False while not response_received: - msg = bus.get() - response_received = isinstance(msg, Response) and ( - hasattr(msg, 'id') and msg.id == req.id) + msg = self.bus.get() + response_received = ( + isinstance(msg, Response) and + hasattr(msg, 'id') and + msg.id == request.id) - signal.alarm(0) - print(msg) - - os._exit(0) + if timeout: clear_timeout() + self.on_response(msg) -def main(): - parser = argparse.ArgumentParser() - parser.add_argument('--config', '-c', dest='config', required=False, - default=None, help="Configuration file path (default: " + - "~/.config/platypush/config.yaml or " + - "/etc/platypush/config.yaml") + def push(self, target, action, backend=None, config_file=None, + timeout=default_response_wait_timeout, **kwargs): + """ + Sends a message on a backend and optionally waits for an answer. + Params: + target -- Target node + action -- Action to be executed in the form plugin.path.method + (e.g. shell.exec or music.mpd.play) + backend -- Name of the backend that will process the request and get + the response (e.g. 'pushbullet' or 'kafka') (default: whichever + backend marked as pusher=true in your config.yaml) + timeout -- Response receive timeout in seconds + - Pusher Default: 5 seconds + - If timeout == 0 or None: Pusher exits without waiting for a response + config_file -- Path to the configuration file to be used (default: + ~/.config/platypush/config.yaml or + /etc/platypush/config.yaml) + **kwargs -- Optional key-valued arguments for the action method + (e.g. cmd='echo ping' or groups="['Living Room']") + """ - parser.add_argument('--target', '-t', dest='target', required=True, - help="Destination of the command") + def _timeout_hndl(signum, frame): + """ Default response timeout handle: raise RuntimeError and exit """ - parser.add_argument('--action', '-a', dest='action', required=True, - help="Action to execute, as package.method") + if not backend: backend = self.backend - parser.add_argument('--backend', '-b', dest='backend', required=False, - default=None, help="Backend to deliver the message " + - "[pushbullet|kafka|local] (default: whatever " + - "specified in your config with pusher=True)") + req = Request.build({ + 'target' : target, + 'action' : action, + 'args' : kwargs, + }) - parser.add_argument('--timeout', '-T', dest='timeout', required=False, - default=_DEFAULT_TIMEOUT_SEC, help="The application " + - "will wait for a response for this number of seconds " + - "(default: " + str(_DEFAULT_TIMEOUT_SEC) + " seconds. " - "A zero value means that the application " + - " will exit without waiting for a response)") + b = self.get_backend(backend) + b.start() + b.send_request(req) - opts, args = parser.parse_known_args(sys.argv[1:]) + if timeout: self.response_wait(request=req, timeout=timeout) - if len(args) % 2 != 0: - raise RuntimeError('Odd number of key-value options passed: {}'. - format(args)) - payload = {} - for i in range(0, len(args), 2): - payload[re.sub('^-+', '', args[i])] = args[i+1] +def main(args=sys.argv[1:]): + opts = Pusher.parse_build_args(args) - pusher(target=opts.target, action=opts.action, - backend=opts.backend, config=opts.config, timeout=opts.timeout, - **payload) + pusher = Pusher(config_file=opts.config, backend=opts.backend) + + pusher.push(opts.target, action=opts.action, timeout=opts.timeout, + **opts.args) if __name__ == '__main__': diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index 90cf5184fc..c3248e34b0 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -1,8 +1,11 @@ import functools import importlib import logging +import signal +import time from platypush.config import Config +from platypush.message import Message modules = {} @@ -15,7 +18,7 @@ def get_or_load_plugin(plugin_name, reload=False): try: module = importlib.import_module('platypush.plugins.' + plugin_name) except ModuleNotFoundError as e: - logging.warn('No such plugin: {}'.format(plugin_name)) + logging.warning('No such plugin: {}'.format(plugin_name)) raise RuntimeError(e) # e.g. plugins.music.mpd main class: MusicMpdPlugin @@ -28,10 +31,11 @@ def get_or_load_plugin(plugin_name, reload=False): if plugin_name in Config.get_plugins() else {} try: - plugin = getattr(module, cls_name)(**plugin_conf) + plugin_class = getattr(module, cls_name) + plugin = plugin_class(**plugin_conf) modules[plugin_name] = plugin except AttributeError as e: - logging.warn('No such class in {}: {}'.format( + logging.warning('No such class in {}: {}'.format( plugin_name, cls_name)) raise RuntimeError(e) @@ -55,12 +59,64 @@ def init_backends(bus=None): b = getattr(module, cls_name)(bus=bus, **cfg) backends[k] = b except AttributeError as e: - logging.warn('No such class in {}: {}'.format( + logging.warning('No such class in {}: {}'.format( module.__name__, cls_name)) raise RuntimeError(e) return backends +def get_module_and_name_from_action(action): + """ Input : action=music.mpd.play + Output : ('music.mpd', 'play') """ + + tokens = action.split('.') + module_name = str.join('.', tokens[:-1]) + method_name = tokens[-1:][0] + return (module_name, method_name) + + +def get_message_class_by_type(msgtype): + """ Gets the class of a message type given as string """ + + try: + module = importlib.import_module('platypush.message.' + msgtype) + except ModuleNotFoundError as e: + logging.warning('Unsupported message type {}'.format(msgtype)) + raise RuntimeError(e) + + cls_name = msgtype[0].upper() + msgtype[1:] + + try: + msgclass = getattr(module, cls_name) + except AttributeError as e: + logging.warning('No such class in {}: {}'.format( + module.__name__, cls_name)) + raise RuntimeError(e) + + return msgclass + + +def set_timeout(seconds, on_timeout): + """ + Set a function to be called if timeout expires without being cleared. + It only works on the main thread. + + Params: + seconds -- Timeout in seconds + on_timeout -- Function invoked on timeout unless clear_timeout is called before + """ + + def _sighandler(signum, frame): + on_timeout() + + signal.signal(signal.SIGALRM, _sighandler) + signal.alarm(seconds) + + +def clear_timeout(): + """ Clear any previously set timeout """ + signal.alarm(0) + # vim:sw=4:ts=4:et: