diff --git a/platypush/bus/redis.py b/platypush/bus/redis.py index 33573283..47c6d3ee 100644 --- a/platypush/bus/redis.py +++ b/platypush/bus/redis.py @@ -1,3 +1,4 @@ +import ast import json import logging import threading @@ -34,10 +35,16 @@ class RedisBus(Bus): try: msg = self.redis.blpop(self.redis_queue) - if msg and msg[1]: - msg = Message.build(json.loads(msg[1].decode('utf-8'))) - else: - msg = None + if not msg or msg[1] is None: + return + + msg = msg[1].decode('utf-8') + try: + msg = json.loads(msg) + except json.decoder.JSONDecodeError: + msg = ast.literal_eval(msg) + + msg = Message.build(msg) except Exception as e: logger.exception(e) @@ -49,4 +56,3 @@ class RedisBus(Bus): # vim:sw=4:ts=4:et: - diff --git a/platypush/config/__init__.py b/platypush/config/__init__.py index 3bbcabc8..d1eb7447 100644 --- a/platypush/config/__init__.py +++ b/platypush/config/__init__.py @@ -1,16 +1,20 @@ import datetime +import importlib +import inspect import logging import os +import pkgutil import re import socket import sys import yaml -from platypush.utils import get_hash +from platypush.utils import get_hash, is_functional_procedure, is_functional_hook """ Config singleton instance """ _default_config_instance = None + class Config(object): """ Configuration base class @@ -57,7 +61,7 @@ class Config(object): raise RuntimeError('No config file specified and nothing found in {}' .format(self._cfgfile_locations)) - self._cfgfile = cfgfile + self._cfgfile = os.path.abspath(os.path.expanduser(cfgfile)) self._config = self._read_config_file(self._cfgfile) if 'token' in self._config: @@ -68,6 +72,15 @@ class Config(object): self._config['workdir'] = self._workdir_location os.makedirs(self._config['workdir'], exist_ok=True) + if 'scripts_dir' not in self._config: + self._config['scripts_dir'] = os.path.join(os.path.dirname(cfgfile), 'scripts') + os.makedirs(self._config['scripts_dir'], mode=0o755, exist_ok=True) + + init_py = os.path.join(self._config['scripts_dir'], '__init__.py') + if not os.path.isfile(init_py): + with open(init_py, 'w') as f: + f.write('# Auto-generated __init__.py - do not remove\n') + self._config['db'] = self._config.get('main.db', { 'engine': 'sqlite:///' + os.path.join( os.path.expanduser('~'), '.local', 'share', 'platypush', 'main.db') @@ -80,7 +93,7 @@ class Config(object): } if 'logging' in self._config: - for (k,v) in self._config['logging'].items(): + for (k, v) in self._config['logging'].items(): if k == 'filename': logfile = os.path.expanduser(v) logdir = os.path.dirname(logfile) @@ -106,7 +119,7 @@ class Config(object): self._config['device_id'] = socket.gethostname() if 'environment' in self._config: - for k,v in self._config['environment'].items(): + for k, v in self._config['environment'].items(): os.environ[k] = str(v) self.backends = {} @@ -117,17 +130,18 @@ class Config(object): self.cronjobs = {} self._init_constants() + self._load_scripts() self._init_components() @staticmethod def _is_special_token(token): return token == 'main.db' or \ - token == 'token' or \ - token == 'token_hash' or \ - token == 'logging' or \ - token == 'workdir' or \ - token == 'device_id' or \ - token == 'environment' + token == 'token' or \ + token == 'token_hash' or \ + token == 'logging' or \ + token == 'workdir' or \ + token == 'device_id' or \ + token == 'environment' def _read_config_file(self, cfgfile): cfgfile_dir = os.path.dirname(os.path.abspath( @@ -155,12 +169,44 @@ class Config(object): included_config = self._read_config_file(include_file) for incl_section in included_config.keys(): config[incl_section] = included_config[incl_section] + elif section == 'scripts_dir': + assert isinstance(file_config[section], str) + config['scripts_dir'] = os.path.abspath(os.path.expanduser(file_config[section])) elif 'disabled' not in file_config[section] \ or file_config[section]['disabled'] is False: config[section] = file_config[section] return config + def _load_scripts(self): + scripts_dir = self._config['scripts_dir'] + sys_path = sys.path.copy() + sys.path = [scripts_dir] + sys.path + + for x, modname, y in pkgutil.walk_packages(path=[scripts_dir], onerror=lambda x: None): + try: + module = importlib.import_module(modname) + except Exception as e: + print('Unhandled exception while importing module {} from {}: {}'.format( + modname, scripts_dir, str(e) + )) + + continue + + self.procedures.update(**{ + modname + '.' + name: obj + for name, obj in inspect.getmembers(module) + if is_functional_procedure(obj) + }) + + self.event_hooks.update(**{ + modname + '.' + name: obj + for name, obj in inspect.getmembers(module) + if is_functional_hook(obj) + }) + + sys.path = sys_path + def _init_components(self): for key in self._config.keys(): if key.startswith('backend.'): @@ -199,38 +245,42 @@ class Config(object): if 'constants' in self._config: self.constants = self._config['constants'] - for (key,value) in self._default_constants.items(): + for (key, value) in self._default_constants.items(): self.constants[key] = value - @staticmethod def get_backends(): global _default_config_instance - if _default_config_instance is None: _default_config_instance = Config() + if _default_config_instance is None: + _default_config_instance = Config() return _default_config_instance.backends @staticmethod def get_plugins(): global _default_config_instance - if _default_config_instance is None: _default_config_instance = Config() + if _default_config_instance is None: + _default_config_instance = Config() return _default_config_instance.plugins @staticmethod def get_event_hooks(): global _default_config_instance - if _default_config_instance is None: _default_config_instance = Config() + if _default_config_instance is None: + _default_config_instance = Config() return _default_config_instance.event_hooks @staticmethod def get_procedures(): global _default_config_instance - if _default_config_instance is None: _default_config_instance = Config() + if _default_config_instance is None: + _default_config_instance = Config() return _default_config_instance.procedures @staticmethod def get_constants(): global _default_config_instance - if _default_config_instance is None: _default_config_instance = Config() + if _default_config_instance is None: + _default_config_instance = Config() constants = {} for name in _default_config_instance.constants.keys(): @@ -240,16 +290,19 @@ class Config(object): @staticmethod def get_constant(name): global _default_config_instance - if _default_config_instance is None: _default_config_instance = Config() + if _default_config_instance is None: + _default_config_instance = Config() - if name not in _default_config_instance.constants: return None + if name not in _default_config_instance.constants: + return None value = _default_config_instance.constants[name] return value() if callable(value) else value @staticmethod def get_cronjobs(): global _default_config_instance - if _default_config_instance is None: _default_config_instance = Config() + if _default_config_instance is None: + _default_config_instance = Config() return _default_config_instance.cronjobs @staticmethod @@ -263,11 +316,11 @@ class Config(object): return backends[0] if backends else None - @classmethod def _get_default_cfgfile(cls): for location in cls._cfgfile_locations: - if os.path.isfile(location): return location + if os.path.isfile(location): + return location @staticmethod def init(cfgfile=None): @@ -287,9 +340,8 @@ class Config(object): key -- Config key to get """ global _default_config_instance - if _default_config_instance is None: _default_config_instance = Config() + if _default_config_instance is None: + _default_config_instance = Config() return _default_config_instance._config.get(key) - # vim:sw=4:ts=4:et: - diff --git a/platypush/event/hook.py b/platypush/event/hook.py index 7d6e9691..d8ba1aea 100644 --- a/platypush/event/hook.py +++ b/platypush/event/hook.py @@ -1,14 +1,14 @@ import copy import json import logging -import re import threading +from functools import wraps from platypush.config import Config -from platypush.message.event import Event, EventMatchResult +from platypush.message.event import Event from platypush.message.request import Request from platypush.procedure import Procedure -from platypush.utils import get_event_class_by_type, set_thread_name +from platypush.utils import get_event_class_by_type, set_thread_name, is_functional_hook logger = logging.getLogger(__name__) @@ -22,7 +22,7 @@ def parse(msg): if isinstance(msg, str): try: msg = json.loads(msg.strip()) - except: + except json.JSONDecodeError: logger.warning('Invalid JSON message: {}'.format(msg)) return None @@ -57,10 +57,12 @@ class EventCondition(object): """ Builds a rule given either another EventRule, a dictionary or a JSON UTF-8 encoded string/bytearray """ - if isinstance(rule, cls): return rule - else: rule = parse(rule) - assert isinstance(rule, dict) + if isinstance(rule, cls): + return rule + else: + rule = parse(rule) + assert isinstance(rule, dict) type = get_event_class_by_type( rule.pop('type') if 'type' in rule else 'Event') @@ -76,11 +78,11 @@ class EventAction(Request): whose fields can be configured later depending on the event context """ def __init__(self, target=None, action=None, **args): - if target is None: target=Config.get('device_id') + if target is None: + target = Config.get('device_id') args_copy = copy.deepcopy(args) super().__init__(target=target, action=action, **args_copy) - @classmethod def build(cls, action): action = super().parse(action) @@ -97,11 +99,11 @@ class EventAction(Request): class EventHook(object): - """ Event hook class. It consists of one conditionss and + """ Event hook class. It consists of one conditions and one or multiple actions to be executed """ - def __init__(self, name, priority=None, condition=None, actions=[]): - """ Construtor. Takes a name, a EventCondition object and an event action + def __init__(self, name, priority=None, condition=None, actions=None): + """ Constructor. Takes a name, a EventCondition object and an event action procedure as input. It may also have a priority attached as a positive number. If multiple hooks match against an event, only the ones that have either the maximum match score or the @@ -109,20 +111,25 @@ class EventHook(object): self.name = name self.condition = EventCondition.build(condition or {}) - self.actions = actions + self.actions = actions or [] self.priority = priority or 0 self.condition.priority = self.priority - @classmethod def build(cls, name, hook): """ Builds a rule given either another EventRule, a dictionary or a JSON UTF-8 encoded string/bytearray """ - if isinstance(hook, cls): return hook - else: hook = parse(hook) - assert isinstance(hook, dict) + if isinstance(hook, cls): + return hook + else: + hook = parse(hook) + if is_functional_hook(hook): + actions = Procedure(name=name, requests=[hook], _async=False) + return cls(name=name, condition=hook.condition, actions=actions) + + assert isinstance(hook, dict) condition = EventCondition.build(hook['if']) if 'if' in hook else None actions = [] priority = hook['priority'] if 'priority' in hook else None @@ -134,17 +141,15 @@ class EventHook(object): else: actions = [hook['then']] - actions = Procedure.build(name=name+'__Hook', requests=actions, _async=False) + actions = Procedure.build(name=name + '__Hook', requests=actions, _async=False) return cls(name=name, condition=condition, actions=actions, priority=priority) - def matches_event(self, event): """ Returns an EventMatchResult object containing the information about the match between the event and this hook """ return event.matches_condition(self.condition) - def run(self, event): """ Checks the condition of the hook against a particular event and runs the hook actions if the condition is met """ @@ -154,14 +159,30 @@ class EventHook(object): self.actions.execute(event=event, **result.parsed_args) result = self.matches_event(event) - token = Config.get('token') if result.is_match: logger.info('Running hook {} triggered by an event'.format(self.name)) - threading.Thread(target=_thread_func, - name='Event-' + self.name, - args=(result,)).start() + threading.Thread(target=_thread_func, name='Event-' + self.name, args=(result,)).start() + + +def hook(f, event_type=Event, **condition): + f.hook = True + f.condition = EventCondition(type=event_type, **condition) + + @wraps(f) + def _execute_hook(*args, **kwargs): + from platypush import Response + + try: + ret = f(*args, **kwargs) + if isinstance(ret, Response): + return ret + + return Response(output=ret) + except Exception as e: + return Response(errors=[str(e)]) + + return _execute_hook # vim:sw=4:ts=4:et: - diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 5b93062a..796a837a 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -12,7 +12,8 @@ from platypush.config import Config from platypush.context import get_plugin from platypush.message import Message from platypush.message.response import Response -from platypush.utils import get_hash, get_module_and_method_from_action, get_redis_queue_name_by_message +from platypush.utils import get_hash, get_module_and_method_from_action, get_redis_queue_name_by_message, \ + is_functional_procedure logger = logging.getLogger(__name__) @@ -69,8 +70,19 @@ class Request(Message): from platypush.procedure import Procedure logger.info('Executing procedure request: {}'.format(self.action)) - proc_name = self.action.split('.')[-1] - proc_config = Config.get_procedures()[proc_name] + procedures = Config.get_procedures() + proc_name = '.'.join(self.action.split('.')[1:]) + if proc_name not in procedures: + proc_name = self.action.split('.')[-1] + + proc_config = procedures[proc_name] + if is_functional_procedure(proc_config): + kwargs.update(**self.args) + if 'n_tries' in kwargs: + del kwargs['n_tries'] + + return proc_config(*args, **kwargs) + proc = Procedure.build(name=proc_name, requests=proc_config['actions'], _async=proc_config['_async'], args=self.args, backend=self.backend, id=self.id) diff --git a/platypush/procedure/__init__.py b/platypush/procedure/__init__.py index 4e5db70e..af873425 100644 --- a/platypush/procedure/__init__.py +++ b/platypush/procedure/__init__.py @@ -1,6 +1,7 @@ import enum import logging import re +from functools import wraps from queue import LifoQueue from ..config import Config @@ -179,6 +180,10 @@ class Procedure(object): token = Config.get('token') for request in self.requests: + if callable(request): + response = request(**context) + continue + if isinstance(request, Statement): if request == Statement.RETURN: self._should_return = True @@ -461,4 +466,21 @@ class IfProcedure(Procedure): return response +def procedure(f): + f.procedure = True + + @wraps(f) + def _execute_procedure(*args, **kwargs): + try: + ret = f(*args, **kwargs) + if isinstance(ret, Response): + return ret + + return Response(output=ret) + except Exception as e: + return Response(errors=[str(e)]) + + return _execute_procedure + + # vim:sw=4:ts=4:et: diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index 16d1a002..e607877e 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -114,7 +114,6 @@ def get_decorators(cls, climb_class_hierarchy=False): def visit_FunctionDef(node): for n in node.decorator_list: - name = '' if isinstance(n, ast.Call): name = n.func.attr if isinstance(n.func, ast.Attribute) else n.func.id else: @@ -283,4 +282,20 @@ def grouper(n, iterable, fillvalue=None): yield filter(None, chunk) +def is_functional_procedure(obj) -> bool: + return callable(obj) and hasattr(obj, 'procedure') + + +def is_functional_hook(obj) -> bool: + return callable(obj) and hasattr(obj, 'hook') + + +def run(action, **kwargs): + from platypush.context import get_plugin + (module_name, method_name) = get_module_and_method_from_action(action) + plugin = get_plugin(module_name) + method = getattr(plugin, method_name) + return method(**kwargs) + + # vim:sw=4:ts=4:et: