diff --git a/README.md b/README.md index f435fa80..af22d8df 100644 --- a/README.md +++ b/README.md @@ -35,10 +35,6 @@ Edit the file to include: * The host and port of the Kafka installation * The topic that will be used to deliver and process messages -### For the local socket backend - -* The name of the local FIFO that will be used to deliver and process messages - ### device_id Each target device is identified by a unique device_id in the messages sent over your account. The device_id is the hostname by default, unless changed in config.yaml. @@ -161,9 +157,9 @@ You can also write your own backends, where a backend is nothing but a thread th 5. The configuration for your module will be read from a section named `backend.voicemail` from your `config.yaml`. Its values will be passed over the backend constructor arguments. -6. Implement the `run` method. Since a backend is a thread that polls for new messages on a channel, this will be the thread main method. `_send_msg` should call `self.on_msg` at the end to post a new message to the application. +6. Implement the `run` method. Since a backend is a thread that polls for new messages on a channel, this will be the thread main method. `send_message` should call `self.on_message` at the end to post a new message to the application. -7. Implement the `_send_msg` method. This method will be called whenever the application needs to send a new message through `send_request` and `send_response`. You should never call `_send_msg` directly. +7. Implement the `send_message` method. This method will be called whenever the application needs to send a new message through `send_request` and `send_response`. You should never call `send_message` directly. The `__init__.py` will look like this: @@ -176,12 +172,12 @@ class VoicemailBackend(Backend) self.phone = phone self.voicemail = Voicemail(...) - def _send_msg(self, msg): + def send_message(self, msg): self.voicemail.save_msg(msg) def run(self): while True: msg = self.voicemail.poll() - self.on_msg(msg) + self.on_message(msg) ``` diff --git a/platypush/__init__.py b/platypush/__init__.py index 903a4360..af0e3d52 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -1,3 +1,4 @@ +import argparse import logging import sys import traceback @@ -7,7 +8,8 @@ from getopt import getopt from .bus import Bus from .config import Config -from .utils import get_or_load_plugin, init_backends +from .utils import get_or_load_plugin, init_backends, get_module_and_name_from_action +from .message.event import Event, StopEvent from .message.request import Request from .message.response import Response @@ -16,72 +18,136 @@ __version__ = '0.4' #-----------# -def _execute_request(request, retry=True): - tokens = request.action.split('.') - module_name = str.join('.', tokens[:-1]) - method_name = tokens[-1:][0] +class Daemon(object): + """ Main class for the Platypush daemon """ - try: - plugin = get_or_load_plugin(module_name) - except RuntimeError as e: # Module/class not found - logging.exception(e) - return + """ Configuration file (default: either ~/.config/platypush/config.yaml or + /etc/platypush/config.yaml) """ + config_file = None + + """ Application bus. It's an internal queue where: + - backends will post the messages they receive + - plugins will post the responses they process """ + bus = None + + """ backend_name => backend_obj map """ + backends = None + + """ number of executions retries before a request fails """ + n_tries = 2 + + def __init__(self, config_file=None): + """ Constructor + Params: + config_file -- Configuration file override (default: None) + """ + + self.config_file = config_file + Config.init(self.config_file) + logging.basicConfig(level=Config.get('logging'), stream=sys.stdout) + + @classmethod + def build_from_cmdline(cls, args): + """ Build the app from command line arguments. + Params: + args -- Your sys.argv[1:] [List of strings] + """ + parser = argparse.ArgumentParser() + parser.add_argument('--config', '-c', dest='config', required=False, + default=None, help=cls.config_file.__doc__) + + opts, args = parser.parse_known_args(args) + return cls(config_file=opts.config) + + def on_message(self): + """ Default message handler """ + + def _f(msg): + """ on_message closure + Params: + msg -- platypush.message.Message instance """ + if isinstance(msg, Request): + logging.info('Processing request: {}'.format(msg)) + Thread(target=self.run_request(), args=(msg,)).start() + elif isinstance(msg, Response): + logging.info('Received response: {}'.format(msg)) + + return _f + + def send_response(self, request, response): + """ Sends a response back. + Params: + request -- The platypush.message.request.Request object + response -- The platypush.message.response.Response object """ - try: - response = plugin.run(method=method_name, **request.args) - if response and response.is_error(): - logging.warn('Response processed with errors: {}'.format(response)) - except Exception as e: - response = Response(output=None, errors=[str(e), traceback.format_exc()]) - logging.exception(e) - if retry: - logging.info('Reloading plugin {} and retrying'.format(module_name)) - get_or_load_plugin(module_name, reload=True) - _execute_request(request, retry=False) - finally: - # Send the response on the backend that received the request if request.backend and request.origin: if request.id: response.id = request.id response.target = request.origin logging.info('Processing response: {}'.format(response)) request.backend.send_response(response) + else: + logging.info('Ignoring response as the request has no backend: ' + .format(request)) + + def run_request(self): + """ Runs a request and returns the response """ + def _thread_func(request, n_tries=self.n_tries): + """ Thread closure method + Params: + request - platypush.message.request.Request object """ + + (module_name, method_name) = get_module_and_name_from_action(request.action) + + try: + plugin = get_or_load_plugin(module_name) + except RuntimeError as e: # Module/class not found + logging.exception(e) + return + + try: + # Run the action + response = plugin.run(method=method_name, **request.args) + if response and response.is_error(): + logging.warning('Response processed with errors: {}'.format(response)) + except Exception as e: # Retry mechanism + response = Response(output=None, errors=[str(e), traceback.format_exc()]) + logging.exception(e) + if n_tries: + logging.info('Reloading plugin {} and retrying'.format(module_name)) + get_or_load_plugin(module_name, reload=True) + _thread_func(request, n_tries=n_tries-1) + finally: + # Send the response on the backend that received the request + self.send_response(request, response) + + return _thread_func + + def start(self): + """ Start the daemon """ + self.bus = Bus(on_message=self.on_message()) + + # Initialize the backends and link them to the bus + self.backends = init_backends(self.bus) + + # Start the backend threads + for backend in self.backends.values(): + backend.start() + + # Poll for messages on the bus + try: + self.bus.poll() + except KeyboardInterrupt as e: + logging.info('SIGINT received, terminating application') + + for backend in self.backends.values(): + backend.stop() -def on_msg(msg): - if isinstance(msg, Request): - logging.info('Processing request: {}'.format(msg)) - Thread(target=_execute_request, args=(msg,)).start() - elif isinstance(msg, Response): - logging.info('Received response: {}'.format(msg)) - - -def main(): +def main(args=sys.argv[1:]): print('Starting platypush v.{}'.format(__version__)) - config_file = None - - optlist, args = getopt(sys.argv[1:], 'vh') - for opt, arg in optlist: - if opt == '-c': - config_file = arg - elif opt == '-h': - print(''' -Usage: {} [-h] [-c ] - -h Show this help - -c Path to the configuration file (default: ./config.yaml) -'''.format(sys.argv[0])) - return - - Config.init(config_file) - logging.basicConfig(level=Config.get('logging'), stream=sys.stdout) - - bus = Bus(on_msg=on_msg) - backends = init_backends(bus) - - for backend in backends.values(): - backend.start() - - bus.loop_forever() + app = Daemon.build_from_cmdline(args) + app.start() if __name__ == '__main__': main() diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index a8e3361b..3d4dac42 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -1,12 +1,15 @@ import importlib import logging import sys +import threading from threading import Thread from platypush.bus import Bus from platypush.config import Config +from platypush.utils import get_message_class_by_type from platypush.message import Message +from platypush.message.event import Event, StopEvent from platypush.message.request import Request from platypush.message.response import Response @@ -25,43 +28,16 @@ class Backend(Thread): # the received messages will be pushed self.bus = bus if bus else Bus() self.device_id = Config.get('device_id') - self.msgtypes = {} + self.thread_id = None + self._stop = False Thread.__init__(self) logging.basicConfig(stream=sys.stdout, level=Config.get('logging') if 'logging' not in kwargs else getattr(logging, kwargs['logging'])) - def is_local(self): - """ Returns true if this is a local backend """ - from platypush.backend.local import LocalBackend - return isinstance(self, LocalBackend) - def _get_msgtype_class(self, msgtype): - """ Gets the class of a message type """ - - if msgtype in self.msgtypes: return self.msgtypes[msgtype] - - try: - module = importlib.import_module('platypush.message.' + msgtype) - except ModuleNotFoundError as e: - logging.warn('Unsupported message type {}'.format(msgtype)) - raise RuntimeError(e) - - cls_name = msgtype[0].upper() + msgtype[1:] - - try: - msgclass = getattr(module, cls_name) - self.msgtypes[msgtype] = msgclass - except AttributeError as e: - logging.warn('No such class in {}: {}'.format( - module.__name__, cls_name)) - raise RuntimeError(e) - - return msgclass - - - def on_msg(self, msg): + def on_message(self, msg): """ Callback when a message is received on the backend. It parses and posts the message on the main bus. @@ -76,18 +52,26 @@ class Backend(Thread): msg = Message.parse(msg) if 'type' not in msg: - logging.warn('Ignoring message with no type: {}'.format(msg)) + logging.warning('Ignoring message with no type: {}'.format(msg)) return - msgtype = self._get_msgtype_class(msg['type']) + msgtype = get_message_class_by_type(msg['type']) msg = msgtype.build(msg) - if not getattr(msg, 'target') or (msg.target != self.device_id and not self.is_local()): + if not getattr(msg, 'target') or msg.target != self.device_id: return # Not for me - logging.debug('Message received on the backend: {}'.format(msg)) + logging.info('Message received on the {} backend: {}'.format( + self.__class__.__name__, msg)) + msg.backend = self # Augment message - self.bus.post(msg) + + if isinstance(msg, StopEvent) and msg.targets_me(): + logging.info('Received STOP event on the {} backend: {}'.format( + self.__class__.__name__, msg)) + self._stop = True + else: + self.bus.post(msg) def send_request(self, request): """ @@ -101,7 +85,7 @@ class Backend(Thread): assert isinstance(request, Request) request.origin = self.device_id - self._send_msg(request) + self.send_message(request) def send_response(self, response): """ @@ -115,27 +99,39 @@ class Backend(Thread): assert isinstance(response, Response) response.origin = self.device_id - self._send_msg(response) + self.send_message(response) - def _send_msg(self, msg): + + def send_message(self, msg): """ Sends a platypush.message.Message to a node. To be implemented in the derived classes. - Always call send_request or send_response instead of _send_msg directly + Always call send_request or send_response instead of send_message directly Param: msg -- The message """ - raise NotImplementedError("_send_msg should be implemented in a derived class") + raise NotImplementedError("send_message should be implemented in a derived class") def run(self): """ Starts the backend thread. To be implemented in the derived classes """ - raise NotImplementedError("run should be implemented in a derived class") + self.thread_id = threading.get_ident() + + def on_stop(self): + """ Callback invoked when the process stops """ + pass def stop(self): - """ Stops the backend thread (default: do nothing) """ - pass + """ Stops the backend thread by sending a STOP event on its bus """ + evt = StopEvent(target=self.device_id, origin=self.device_id, + thread_id=self.thread_id) + + self.send_message(evt) + self.on_stop() + + def should_stop(self): + return self._stop # vim:sw=4:ts=4:et: diff --git a/platypush/backend/kafka/__init__.py b/platypush/backend/kafka/__init__.py index bb8d8341..91acb836 100644 --- a/platypush/backend/kafka/__init__.py +++ b/platypush/backend/kafka/__init__.py @@ -1,11 +1,14 @@ import logging import json +import time from kafka import KafkaConsumer, KafkaProducer from .. import Backend class KafkaBackend(Backend): + _conn_retry_secs = 5 + def __init__(self, server, topic, **kwargs): super().__init__(**kwargs) @@ -23,7 +26,7 @@ class KafkaBackend(Backend): logging.exception(e) logging.debug('Received message: {}'.format(msg)) - self.on_msg(msg) + self.on_message(msg) def _init_producer(self): if not self.producer: @@ -32,7 +35,7 @@ class KafkaBackend(Backend): def _topic_by_device_id(self, device_id): return '{}.{}'.format(self.topic_prefix, device_id) - def _send_msg(self, msg): + def send_message(self, msg): target = msg.target msg = str(msg).encode('utf-8') @@ -40,13 +43,32 @@ class KafkaBackend(Backend): self.producer.send(self._topic_by_device_id(target), msg) self.producer.flush() + def on_stop(self): + try: + if self.producer: + self.producer.flush() + self.producer.close() + + if self.consumer: + self.consumer.close() + except: pass + def run(self): + super().run() + self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server) logging.info('Initialized kafka backend - server: {}, topic: {}' .format(self.server, self.topic)) - for msg in self.consumer: - self._on_record(msg) + try: + for msg in self.consumer: + self._on_record(msg) + if self.should_stop(): + break + except ConnectionError: + logging.warning('Kafka connection error, retrying in {} seconds'. + format(self._conn_retry_secs)) + time.sleep(self._conn_retry_secs) # vim:sw=4:ts=4:et: diff --git a/platypush/backend/local/__init__.py b/platypush/backend/local/__init__.py deleted file mode 100644 index 6de045e5..00000000 --- a/platypush/backend/local/__init__.py +++ /dev/null @@ -1,48 +0,0 @@ -import logging -import json -import os -import time - -from .. import Backend - -class LocalBackend(Backend): - def __init__(self, fifo, **kwargs): - super().__init__(**kwargs) - - self.fifo = fifo - - def _send_msg(self, msg): - msglen = len(str(msg))+1 # Include \n - - # Message format: b"\n\n" - msg = bytearray((str(msglen) + '\n' + str(msg) + '\n').encode('utf-8')) - with open(self.fifo, 'wb') as f: - f.write(msg) - - def run(self): - try: os.mkfifo(self.fifo) - except FileExistsError as e: pass - logging.info('Initialized local backend on fifo {}'.format(self.fifo)) - - with open(self.fifo, 'rb', 0) as f: - while True: - try: - msglen = int(f.readline()) - except ValueError as e: - time.sleep(0.1) - continue - - msg = f.read(msglen-1) - if not msg: continue - - try: - msg = json.loads(msg.decode('utf-8')) - except Exception as e: - logging.exception(e) - continue - - logging.debug('Received message: {}'.format(msg)) - self.on_msg(msg) - -# vim:sw=4:ts=4:et: - diff --git a/platypush/backend/pushbullet/__init__.py b/platypush/backend/pushbullet/__init__.py index cddf5a8d..9ec630d9 100644 --- a/platypush/backend/pushbullet/__init__.py +++ b/platypush/backend/pushbullet/__init__.py @@ -18,8 +18,9 @@ class PushbulletBackend(Backend): self.pb_device_id = self.get_device_id() self._last_received_msg = { - 'request': { 'body': None, 'time': None }, - 'response': { 'body': None, 'time': None }, + 'request' : { 'body': None, 'time': None }, + 'response' : { 'body': None, 'time': None }, + 'event' : { 'body': None, 'time': None }, } def _get_latest_push(self): @@ -64,51 +65,49 @@ class PushbulletBackend(Backend): return is_duplicate - @staticmethod - def _on_msg(backend): + def on_push(self): def _f(ws, data): try: - data = json.loads(data) if isinstance(data, str) else push + try: + data = json.loads(data) if isinstance(data, str) else push + except Exception as e: + logging.exception(e) + return + + if data['type'] == 'tickle' and data['subtype'] == 'push': + push = self._get_latest_push() + elif data['type'] == 'push': + push = data['push'] + else: return # Not a push notification + + logging.debug('Received push: {}'.format(push)) + + body = push['body'] + try: body = json.loads(body) + except ValueError as e: return # Some other non-JSON push + + if not self._should_skip_last_received_msg(body): + self.on_message(body) except Exception as e: logging.exception(e) return - if data['type'] == 'tickle' and data['subtype'] == 'push': - push = backend._get_latest_push() - elif data['type'] == 'push': - push = data['push'] - else: - return # Not a push notification - - logging.debug('Received push: {}'.format(push)) - - if 'body' not in push: return - - body = push['body'] - try: body = json.loads(body) - except ValueError as e: return - - if not backend._should_skip_last_received_msg(body): - backend.on_msg(body) - return _f - @staticmethod - def _on_error(backend): + def on_error(self): def _f(ws, e): logging.exception(e) logging.info('Restarting PushBullet backend') ws.close() - backend._init_socket() + self._init_socket() return _f def _init_socket(self): self.ws = websocket.WebSocketApp( 'wss://stream.pushbullet.com/websocket/' + self.token, - # on_message = self._on_msg, - on_message = self._on_msg(self), - on_error = self._on_error(self)) + on_message = self.on_push(), + on_error = self.on_error()) def get_device_id(self): response = requests.get( @@ -125,7 +124,7 @@ class PushbulletBackend(Backend): return devices[0]['iden'] - def _send_msg(self, msg): + def send_message(self, msg): requests.post( u'https://api.pushbullet.com/v2/pushes', headers = { 'Access-Token': self.token }, @@ -136,7 +135,12 @@ class PushbulletBackend(Backend): } ).json() + def on_stop(self): + self.ws.close() + def run(self): + super().run() + self._init_socket() logging.info('Initialized Pushbullet backend - device_id: {}' .format(self.device_name)) diff --git a/platypush/bus/__init__.py b/platypush/bus/__init__.py index 52010c17..c2c53ce5 100644 --- a/platypush/bus/__init__.py +++ b/platypush/bus/__init__.py @@ -3,18 +3,17 @@ import sys import signal import logging +from enum import Enum from queue import Queue +from platypush.message.event import Event, StopEvent + class Bus(object): """ Main local bus where the daemon will listen for new messages """ - """ Number of seconds to wait for any pending threads - before the process returns to the OS """ - _kill_sec_timeout = 5 - - def __init__(self, on_msg=None): + def __init__(self, on_message=None): self.bus = Queue() - self.on_msg = on_msg + self.on_message = on_message def post(self, msg): """ Sends a message to the bus """ @@ -24,25 +23,24 @@ class Bus(object): """ Reads one message from the bus """ return self.bus.get() - def loop_forever(self): - """ Reads messages from the bus until KeyboardInterrupt """ - def _on_stop_timeout(signum, frame): - logging.warn('Stopping all the active threads after waiting for ' + - '{} seconds'.format(self._kill_sec_timeout)) - os._exit(1) + def poll(self): + """ + Reads messages from the bus until either stop event message or KeyboardInterrupt + """ - if not self.on_msg: return + if not self.on_message: + logging.warning('No message handlers installed, cannot poll') + return - while True: - try: - self.on_msg(self.get()) - except KeyboardInterrupt: - logging.info('Received keyboard interrupt ' + - '- terminating application') + stop=False + while not stop: + msg = self.get() + self.on_message(msg) + + if isinstance(msg, StopEvent) and msg.targets_me(): + logging.info('Received STOP event') + stop=True - signal.signal(signal.SIGALRM, _on_stop_timeout) - signal.alarm(self._kill_sec_timeout) - sys.exit(0) # vim:sw=4:ts=4:et: diff --git a/platypush/config.example.yaml b/platypush/config.example.yaml index 932f4837..706ca35a 100644 --- a/platypush/config.example.yaml +++ b/platypush/config.example.yaml @@ -11,9 +11,6 @@ backend.pushbullet: token: your_pushbullet_token_here device: your_pushbullet_virtual_device_name -backend.local: - fifo: /tmp/platypush.fifo - # device_id: (default: current hostname) # debug: True (default: False) diff --git a/platypush/message/__init__.py b/platypush/message/__init__.py index 0dc2bcf1..1427aa53 100644 --- a/platypush/message/__init__.py +++ b/platypush/message/__init__.py @@ -38,7 +38,7 @@ class Message(object): if isinstance(msg, bytes) or isinstance(msg, bytearray): msg = msg.decode('utf-8') if isinstance(msg, str): - msg = json.loads(msg) + msg = json.loads(msg.strip()) assert isinstance(msg, dict) return msg diff --git a/platypush/message/event/__init__.py b/platypush/message/event/__init__.py new file mode 100644 index 00000000..973bc5a8 --- /dev/null +++ b/platypush/message/event/__init__.py @@ -0,0 +1,112 @@ +import json +import random +import threading + +from enum import Enum + +from platypush.message import Message + +class Event(Message): + """ Event message class """ + + def __init__(self, target, type, origin, id=None, **kwargs): + """ + Params: + target -- Target node [String] + type -- Event type [EventType] + origin -- Origin node (default: current node) [String] + id -- Event ID (default: auto-generated) + kwargs -- Additional arguments for the event [kwDict] + """ + + self.id = id if id else self._generate_id() + self.target = target + self.origin = origin + self.type = type + self.args = kwargs + + @classmethod + def build(cls, msg): + """ Builds an event message from a JSON UTF-8 string/bytearray, a + dictionary, or another Event """ + + msg = super().parse(msg) + event_type = msg['args'].pop('type') + event_class = getattr(EventType, event_type).cls + + args = { + 'target' : msg['target'], + 'origin' : msg['origin'], + **(msg['args'] if 'args' in msg else {}), + } + + args['id'] = msg['id'] if 'id' in msg else cls._generate_id() + return event_class(**args) + + @staticmethod + def _generate_id(): + """ Generate a unique event ID """ + id = '' + for i in range(0,16): + id += '%.2x' % random.randint(0, 255) + return id + + def __str__(self): + """ + Overrides the str() operator and converts + the message into a UTF-8 JSON string + """ + + return json.dumps({ + 'type' : 'event', + 'target' : self.target, + 'origin' : self.origin if hasattr(self, 'origin') else None, + 'id' : self.id if hasattr(self, 'id') else None, + 'args' : { + 'type' : self.type.name, + **self.args, + }, + }) + + +class StopEvent(Event): + """ StopEvent message. When received on a Bus, it will terminate the + listening thread having the specified ID. Useful to keep listeners in + sync and make them quit when the application terminates """ + + def __init__(self, target, origin, thread_id, id=None, **kwargs): + """ Constructor. + Params: + target -- Target node + origin -- Origin node + thread_id -- thread_iden() to be terminated if listening on the bus + id -- Event ID (default: auto-generated) + kwargs -- Extra key-value arguments + """ + + super().__init__(target=target, origin=origin, id=id, + type=EventType.STOP, thread_id=thread_id, **kwargs) + + def targets_me(self): + """ Returns true if the stop event is for the current thread """ + return self.args['thread_id'] == threading.get_ident() + + +class EventType(Enum): + """ Event types enum """ + + def __new__(cls, *args, **kwds): + value = len(cls.__members__) + 1 + obj = object.__new__(cls) + obj._value_ = value + return obj + + def __init__(self, label, cls): + self.label = label + self.cls = cls + + STOP = 'STOP', StopEvent + + +# vim:sw=4:ts=4:et: + diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index f0083c32..464f1277 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -33,7 +33,7 @@ class Request(Message): args['id'] = msg['id'] if 'id' in msg else cls._generate_id() if 'origin' in msg: args['origin'] = msg['origin'] - return Request(**args) + return cls(**args) @staticmethod def _generate_id(): diff --git a/platypush/message/response/__init__.py b/platypush/message/response/__init__.py index a7156eff..3f865bb2 100644 --- a/platypush/message/response/__init__.py +++ b/platypush/message/response/__init__.py @@ -36,7 +36,7 @@ class Response(Message): if 'id' in msg: args['id'] = msg['id'] if 'origin' in msg: args['origin'] = msg['origin'] - return Response(**args) + return cls(**args) def __str__(self): diff --git a/platypush/plugins/shell/__init__.py b/platypush/plugins/shell/__init__.py index f0b09a7f..27bc8d09 100644 --- a/platypush/plugins/shell/__init__.py +++ b/platypush/plugins/shell/__init__.py @@ -7,15 +7,15 @@ from .. import Plugin class ShellPlugin(Plugin): def exec(self, cmd): output = None - error = None + errors = [] try: output = subprocess.check_output( cmd, stderr=subprocess.STDOUT, shell=True).decode('utf-8') except subprocess.CalledProcessError as e: - error = e.output.decode('utf-8') + errors = [e.output.decode('utf-8')] - return Response(output=output, errors=[error]) + return Response(output=output, errors=errors) # vim:sw=4:ts=4:et: diff --git a/platypush/pusher/__init__.py b/platypush/pusher/__init__.py index a0c45b18..fca8dde3 100755 --- a/platypush/pusher/__init__.py +++ b/platypush/pusher/__init__.py @@ -1,100 +1,180 @@ import argparse import os import re -import signal import sys from platypush.bus import Bus from platypush.config import Config from platypush.message.request import Request from platypush.message.response import Response -from platypush.utils import init_backends +from platypush.utils import init_backends, set_timeout, clear_timeout -_DEFAULT_TIMEOUT_SEC=5 +class Pusher(object): + """ + Main class to send messages and events to a node + """ -def pusher(target, action, backend=None, config=None, - timeout=_DEFAULT_TIMEOUT_SEC, **kwargs): - def on_timeout(signum, frame): - raise RuntimeError('Response timed out after {} seconds'.format( - timeout)) - os._exit(0) + """ Configuration file path """ + config_file = None - Config.init(config) + """ Default backend name """ + backend = None - if target == 'localhost': - backend = 'local' - elif not backend: - backend = Config.get_default_pusher_backend() + """ Pusher local bus. The response will be processed here """ + bus = None - req = Request.build({ - 'target' : target, - 'action' : action, - 'args' : kwargs, - }) + """ Configured backends as a name => object map """ + backends = {} - bus = Bus() - backends = init_backends(bus=bus) - if backend not in backends: - raise RuntimeError('No such backend configured: {}'.format(backend)) + """ Default response_wait timeout """ + default_response_wait_timeout = 5 - b = backends[backend] - b.start() - b.send_request(req) - if timeout: - signal.signal(signal.SIGALRM, on_timeout) - signal.alarm(timeout) + def __init__(self, config_file=None, backend=None, on_response=None): + """ + Constructor. + Params: + config_file -- Path to the configuration file - default: + ~/.config/platypush/config.yaml or + /etc/platypush/config.yaml) + backend -- Name of the backend where pusher will send the + request and wait for the response (kafka + or pushbullet). Default: whatever is specified + with pusher=true in your configuration file + on_response -- Method that will be invoked upon response receipt. + Takes a platypush.message.response.Response as arg. + Default: print the response and exit. + """ + # Initialize the configuration + self.config_file = config_file + Config.init(config_file) + + self.on_response = on_response or self.default_on_response() + self.backend = backend or Config.get_default_pusher_backend() + self.bus = Bus() + + + @classmethod + def parse_build_args(cls, args): + """ Parse the recognized options from a list of cmdline arguments """ + parser = argparse.ArgumentParser() + parser.add_argument('--config', '-c', dest='config', required=False, + default=None, help="Configuration file path (default: " + + "~/.config/platypush/config.yaml or " + + "/etc/platypush/config.yaml") + + parser.add_argument('--target', '-t', dest='target', required=True, + help="Destination of the command") + + parser.add_argument('--action', '-a', dest='action', required=True, + help="Action to execute, as package.method") + + parser.add_argument('--backend', '-b', dest='backend', required=False, + default=None, help="Backend to deliver the message " + + "[pushbullet|kafka] (default: whatever " + + "specified in your config with pusher=True)") + + parser.add_argument('--timeout', '-T', dest='timeout', required=False, + default=cls.default_response_wait_timeout, help="The application " + + "will wait for a response for this number of seconds " + + "(default: " + str(cls.default_response_wait_timeout) + " seconds. " + "A zero value means that the application " + + " will exit without waiting for a response)") + + opts, args = parser.parse_known_args(args) + + if len(args) % 2 != 0: + raise RuntimeError('Odd number of key-value options passed: {}'.format(args)) + + opts.args = {} + for i in range(0, len(args), 2): + opts.args[re.sub('^-+', '', args[i])] = args[i+1] + + return opts + + def get_backend(self, name): + # Lazy init + if not self.backends: self.backends = init_backends(bus=self.bus) + if name not in self.backends: + raise RuntimeError('No such backend configured: {}'.format(name)) + return self.backends[name] + + def on_timeout(self): + """ Default response timeout handle: raise RuntimeError """ + def _f(): + raise RuntimeError('Response timed out') + return _f + + def default_on_response(self): + def _f(response): + print('Received response: {}'.format(response)) + os._exit(0) + return _f + + def response_wait(self, request, timeout): + # Install the timeout handler + set_timeout(seconds=timeout, on_timeout=self.on_timeout()) + + # Loop on the bus until you get a response for your request ID response_received = False while not response_received: - msg = bus.get() - response_received = isinstance(msg, Response) and ( - hasattr(msg, 'id') and msg.id == req.id) + msg = self.bus.get() + response_received = ( + isinstance(msg, Response) and + hasattr(msg, 'id') and + msg.id == request.id) - signal.alarm(0) - print(msg) - - os._exit(0) + if timeout: clear_timeout() + self.on_response(msg) -def main(): - parser = argparse.ArgumentParser() - parser.add_argument('--config', '-c', dest='config', required=False, - default=None, help="Configuration file path (default: " + - "~/.config/platypush/config.yaml or " + - "/etc/platypush/config.yaml") + def push(self, target, action, backend=None, config_file=None, + timeout=default_response_wait_timeout, **kwargs): + """ + Sends a message on a backend and optionally waits for an answer. + Params: + target -- Target node + action -- Action to be executed in the form plugin.path.method + (e.g. shell.exec or music.mpd.play) + backend -- Name of the backend that will process the request and get + the response (e.g. 'pushbullet' or 'kafka') (default: whichever + backend marked as pusher=true in your config.yaml) + timeout -- Response receive timeout in seconds + - Pusher Default: 5 seconds + - If timeout == 0 or None: Pusher exits without waiting for a response + config_file -- Path to the configuration file to be used (default: + ~/.config/platypush/config.yaml or + /etc/platypush/config.yaml) + **kwargs -- Optional key-valued arguments for the action method + (e.g. cmd='echo ping' or groups="['Living Room']") + """ - parser.add_argument('--target', '-t', dest='target', required=True, - help="Destination of the command") + def _timeout_hndl(signum, frame): + """ Default response timeout handle: raise RuntimeError and exit """ - parser.add_argument('--action', '-a', dest='action', required=True, - help="Action to execute, as package.method") + if not backend: backend = self.backend - parser.add_argument('--backend', '-b', dest='backend', required=False, - default=None, help="Backend to deliver the message " + - "[pushbullet|kafka|local] (default: whatever " + - "specified in your config with pusher=True)") + req = Request.build({ + 'target' : target, + 'action' : action, + 'args' : kwargs, + }) - parser.add_argument('--timeout', '-T', dest='timeout', required=False, - default=_DEFAULT_TIMEOUT_SEC, help="The application " + - "will wait for a response for this number of seconds " + - "(default: " + str(_DEFAULT_TIMEOUT_SEC) + " seconds. " - "A zero value means that the application " + - " will exit without waiting for a response)") + b = self.get_backend(backend) + b.start() + b.send_request(req) - opts, args = parser.parse_known_args(sys.argv[1:]) + if timeout: self.response_wait(request=req, timeout=timeout) - if len(args) % 2 != 0: - raise RuntimeError('Odd number of key-value options passed: {}'. - format(args)) - payload = {} - for i in range(0, len(args), 2): - payload[re.sub('^-+', '', args[i])] = args[i+1] +def main(args=sys.argv[1:]): + opts = Pusher.parse_build_args(args) - pusher(target=opts.target, action=opts.action, - backend=opts.backend, config=opts.config, timeout=opts.timeout, - **payload) + pusher = Pusher(config_file=opts.config, backend=opts.backend) + + pusher.push(opts.target, action=opts.action, timeout=opts.timeout, + **opts.args) if __name__ == '__main__': diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index 90cf5184..c3248e34 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -1,8 +1,11 @@ import functools import importlib import logging +import signal +import time from platypush.config import Config +from platypush.message import Message modules = {} @@ -15,7 +18,7 @@ def get_or_load_plugin(plugin_name, reload=False): try: module = importlib.import_module('platypush.plugins.' + plugin_name) except ModuleNotFoundError as e: - logging.warn('No such plugin: {}'.format(plugin_name)) + logging.warning('No such plugin: {}'.format(plugin_name)) raise RuntimeError(e) # e.g. plugins.music.mpd main class: MusicMpdPlugin @@ -28,10 +31,11 @@ def get_or_load_plugin(plugin_name, reload=False): if plugin_name in Config.get_plugins() else {} try: - plugin = getattr(module, cls_name)(**plugin_conf) + plugin_class = getattr(module, cls_name) + plugin = plugin_class(**plugin_conf) modules[plugin_name] = plugin except AttributeError as e: - logging.warn('No such class in {}: {}'.format( + logging.warning('No such class in {}: {}'.format( plugin_name, cls_name)) raise RuntimeError(e) @@ -55,12 +59,64 @@ def init_backends(bus=None): b = getattr(module, cls_name)(bus=bus, **cfg) backends[k] = b except AttributeError as e: - logging.warn('No such class in {}: {}'.format( + logging.warning('No such class in {}: {}'.format( module.__name__, cls_name)) raise RuntimeError(e) return backends +def get_module_and_name_from_action(action): + """ Input : action=music.mpd.play + Output : ('music.mpd', 'play') """ + + tokens = action.split('.') + module_name = str.join('.', tokens[:-1]) + method_name = tokens[-1:][0] + return (module_name, method_name) + + +def get_message_class_by_type(msgtype): + """ Gets the class of a message type given as string """ + + try: + module = importlib.import_module('platypush.message.' + msgtype) + except ModuleNotFoundError as e: + logging.warning('Unsupported message type {}'.format(msgtype)) + raise RuntimeError(e) + + cls_name = msgtype[0].upper() + msgtype[1:] + + try: + msgclass = getattr(module, cls_name) + except AttributeError as e: + logging.warning('No such class in {}: {}'.format( + module.__name__, cls_name)) + raise RuntimeError(e) + + return msgclass + + +def set_timeout(seconds, on_timeout): + """ + Set a function to be called if timeout expires without being cleared. + It only works on the main thread. + + Params: + seconds -- Timeout in seconds + on_timeout -- Function invoked on timeout unless clear_timeout is called before + """ + + def _sighandler(signum, frame): + on_timeout() + + signal.signal(signal.SIGALRM, _sighandler) + signal.alarm(seconds) + + +def clear_timeout(): + """ Clear any previously set timeout """ + signal.alarm(0) + # vim:sw=4:ts=4:et: