Implemented interface for custom Python scripts, procedures and hooks [closes #131]

This commit is contained in:
Fabio Manganiello 2020-04-08 23:22:54 +02:00
parent 3e56666ba3
commit 0dae03551f
6 changed files with 187 additions and 59 deletions

View file

@ -1,3 +1,4 @@
import ast
import json
import logging
import threading
@ -34,10 +35,16 @@ class RedisBus(Bus):
try:
msg = self.redis.blpop(self.redis_queue)
if msg and msg[1]:
msg = Message.build(json.loads(msg[1].decode('utf-8')))
else:
msg = None
if not msg or msg[1] is None:
return
msg = msg[1].decode('utf-8')
try:
msg = json.loads(msg)
except json.decoder.JSONDecodeError:
msg = ast.literal_eval(msg)
msg = Message.build(msg)
except Exception as e:
logger.exception(e)
@ -49,4 +56,3 @@ class RedisBus(Bus):
# vim:sw=4:ts=4:et:

View file

@ -1,16 +1,20 @@
import datetime
import importlib
import inspect
import logging
import os
import pkgutil
import re
import socket
import sys
import yaml
from platypush.utils import get_hash
from platypush.utils import get_hash, is_functional_procedure, is_functional_hook
""" Config singleton instance """
_default_config_instance = None
class Config(object):
"""
Configuration base class
@ -57,7 +61,7 @@ class Config(object):
raise RuntimeError('No config file specified and nothing found in {}'
.format(self._cfgfile_locations))
self._cfgfile = cfgfile
self._cfgfile = os.path.abspath(os.path.expanduser(cfgfile))
self._config = self._read_config_file(self._cfgfile)
if 'token' in self._config:
@ -68,6 +72,15 @@ class Config(object):
self._config['workdir'] = self._workdir_location
os.makedirs(self._config['workdir'], exist_ok=True)
if 'scripts_dir' not in self._config:
self._config['scripts_dir'] = os.path.join(os.path.dirname(cfgfile), 'scripts')
os.makedirs(self._config['scripts_dir'], mode=0o755, exist_ok=True)
init_py = os.path.join(self._config['scripts_dir'], '__init__.py')
if not os.path.isfile(init_py):
with open(init_py, 'w') as f:
f.write('# Auto-generated __init__.py - do not remove\n')
self._config['db'] = self._config.get('main.db', {
'engine': 'sqlite:///' + os.path.join(
os.path.expanduser('~'), '.local', 'share', 'platypush', 'main.db')
@ -80,7 +93,7 @@ class Config(object):
}
if 'logging' in self._config:
for (k,v) in self._config['logging'].items():
for (k, v) in self._config['logging'].items():
if k == 'filename':
logfile = os.path.expanduser(v)
logdir = os.path.dirname(logfile)
@ -106,7 +119,7 @@ class Config(object):
self._config['device_id'] = socket.gethostname()
if 'environment' in self._config:
for k,v in self._config['environment'].items():
for k, v in self._config['environment'].items():
os.environ[k] = str(v)
self.backends = {}
@ -117,17 +130,18 @@ class Config(object):
self.cronjobs = {}
self._init_constants()
self._load_scripts()
self._init_components()
@staticmethod
def _is_special_token(token):
return token == 'main.db' or \
token == 'token' or \
token == 'token_hash' or \
token == 'logging' or \
token == 'workdir' or \
token == 'device_id' or \
token == 'environment'
token == 'token' or \
token == 'token_hash' or \
token == 'logging' or \
token == 'workdir' or \
token == 'device_id' or \
token == 'environment'
def _read_config_file(self, cfgfile):
cfgfile_dir = os.path.dirname(os.path.abspath(
@ -155,12 +169,44 @@ class Config(object):
included_config = self._read_config_file(include_file)
for incl_section in included_config.keys():
config[incl_section] = included_config[incl_section]
elif section == 'scripts_dir':
assert isinstance(file_config[section], str)
config['scripts_dir'] = os.path.abspath(os.path.expanduser(file_config[section]))
elif 'disabled' not in file_config[section] \
or file_config[section]['disabled'] is False:
config[section] = file_config[section]
return config
def _load_scripts(self):
scripts_dir = self._config['scripts_dir']
sys_path = sys.path.copy()
sys.path = [scripts_dir] + sys.path
for x, modname, y in pkgutil.walk_packages(path=[scripts_dir], onerror=lambda x: None):
try:
module = importlib.import_module(modname)
except Exception as e:
print('Unhandled exception while importing module {} from {}: {}'.format(
modname, scripts_dir, str(e)
))
continue
self.procedures.update(**{
modname + '.' + name: obj
for name, obj in inspect.getmembers(module)
if is_functional_procedure(obj)
})
self.event_hooks.update(**{
modname + '.' + name: obj
for name, obj in inspect.getmembers(module)
if is_functional_hook(obj)
})
sys.path = sys_path
def _init_components(self):
for key in self._config.keys():
if key.startswith('backend.'):
@ -199,38 +245,42 @@ class Config(object):
if 'constants' in self._config:
self.constants = self._config['constants']
for (key,value) in self._default_constants.items():
for (key, value) in self._default_constants.items():
self.constants[key] = value
@staticmethod
def get_backends():
global _default_config_instance
if _default_config_instance is None: _default_config_instance = Config()
if _default_config_instance is None:
_default_config_instance = Config()
return _default_config_instance.backends
@staticmethod
def get_plugins():
global _default_config_instance
if _default_config_instance is None: _default_config_instance = Config()
if _default_config_instance is None:
_default_config_instance = Config()
return _default_config_instance.plugins
@staticmethod
def get_event_hooks():
global _default_config_instance
if _default_config_instance is None: _default_config_instance = Config()
if _default_config_instance is None:
_default_config_instance = Config()
return _default_config_instance.event_hooks
@staticmethod
def get_procedures():
global _default_config_instance
if _default_config_instance is None: _default_config_instance = Config()
if _default_config_instance is None:
_default_config_instance = Config()
return _default_config_instance.procedures
@staticmethod
def get_constants():
global _default_config_instance
if _default_config_instance is None: _default_config_instance = Config()
if _default_config_instance is None:
_default_config_instance = Config()
constants = {}
for name in _default_config_instance.constants.keys():
@ -240,16 +290,19 @@ class Config(object):
@staticmethod
def get_constant(name):
global _default_config_instance
if _default_config_instance is None: _default_config_instance = Config()
if _default_config_instance is None:
_default_config_instance = Config()
if name not in _default_config_instance.constants: return None
if name not in _default_config_instance.constants:
return None
value = _default_config_instance.constants[name]
return value() if callable(value) else value
@staticmethod
def get_cronjobs():
global _default_config_instance
if _default_config_instance is None: _default_config_instance = Config()
if _default_config_instance is None:
_default_config_instance = Config()
return _default_config_instance.cronjobs
@staticmethod
@ -263,11 +316,11 @@ class Config(object):
return backends[0] if backends else None
@classmethod
def _get_default_cfgfile(cls):
for location in cls._cfgfile_locations:
if os.path.isfile(location): return location
if os.path.isfile(location):
return location
@staticmethod
def init(cfgfile=None):
@ -287,9 +340,8 @@ class Config(object):
key -- Config key to get
"""
global _default_config_instance
if _default_config_instance is None: _default_config_instance = Config()
if _default_config_instance is None:
_default_config_instance = Config()
return _default_config_instance._config.get(key)
# vim:sw=4:ts=4:et:

View file

@ -1,14 +1,14 @@
import copy
import json
import logging
import re
import threading
from functools import wraps
from platypush.config import Config
from platypush.message.event import Event, EventMatchResult
from platypush.message.event import Event
from platypush.message.request import Request
from platypush.procedure import Procedure
from platypush.utils import get_event_class_by_type, set_thread_name
from platypush.utils import get_event_class_by_type, set_thread_name, is_functional_hook
logger = logging.getLogger(__name__)
@ -22,7 +22,7 @@ def parse(msg):
if isinstance(msg, str):
try:
msg = json.loads(msg.strip())
except:
except json.JSONDecodeError:
logger.warning('Invalid JSON message: {}'.format(msg))
return None
@ -57,10 +57,12 @@ class EventCondition(object):
""" Builds a rule given either another EventRule, a dictionary or
a JSON UTF-8 encoded string/bytearray """
if isinstance(rule, cls): return rule
else: rule = parse(rule)
assert isinstance(rule, dict)
if isinstance(rule, cls):
return rule
else:
rule = parse(rule)
assert isinstance(rule, dict)
type = get_event_class_by_type(
rule.pop('type') if 'type' in rule else 'Event')
@ -76,11 +78,11 @@ class EventAction(Request):
whose fields can be configured later depending on the event context """
def __init__(self, target=None, action=None, **args):
if target is None: target=Config.get('device_id')
if target is None:
target = Config.get('device_id')
args_copy = copy.deepcopy(args)
super().__init__(target=target, action=action, **args_copy)
@classmethod
def build(cls, action):
action = super().parse(action)
@ -97,11 +99,11 @@ class EventAction(Request):
class EventHook(object):
""" Event hook class. It consists of one conditionss and
""" Event hook class. It consists of one conditions and
one or multiple actions to be executed """
def __init__(self, name, priority=None, condition=None, actions=[]):
""" Construtor. Takes a name, a EventCondition object and an event action
def __init__(self, name, priority=None, condition=None, actions=None):
""" Constructor. Takes a name, a EventCondition object and an event action
procedure as input. It may also have a priority attached
as a positive number. If multiple hooks match against an event,
only the ones that have either the maximum match score or the
@ -109,20 +111,25 @@ class EventHook(object):
self.name = name
self.condition = EventCondition.build(condition or {})
self.actions = actions
self.actions = actions or []
self.priority = priority or 0
self.condition.priority = self.priority
@classmethod
def build(cls, name, hook):
""" Builds a rule given either another EventRule, a dictionary or
a JSON UTF-8 encoded string/bytearray """
if isinstance(hook, cls): return hook
else: hook = parse(hook)
assert isinstance(hook, dict)
if isinstance(hook, cls):
return hook
else:
hook = parse(hook)
if is_functional_hook(hook):
actions = Procedure(name=name, requests=[hook], _async=False)
return cls(name=name, condition=hook.condition, actions=actions)
assert isinstance(hook, dict)
condition = EventCondition.build(hook['if']) if 'if' in hook else None
actions = []
priority = hook['priority'] if 'priority' in hook else None
@ -134,17 +141,15 @@ class EventHook(object):
else:
actions = [hook['then']]
actions = Procedure.build(name=name+'__Hook', requests=actions, _async=False)
actions = Procedure.build(name=name + '__Hook', requests=actions, _async=False)
return cls(name=name, condition=condition, actions=actions, priority=priority)
def matches_event(self, event):
""" Returns an EventMatchResult object containing the information
about the match between the event and this hook """
return event.matches_condition(self.condition)
def run(self, event):
""" Checks the condition of the hook against a particular event and
runs the hook actions if the condition is met """
@ -154,14 +159,30 @@ class EventHook(object):
self.actions.execute(event=event, **result.parsed_args)
result = self.matches_event(event)
token = Config.get('token')
if result.is_match:
logger.info('Running hook {} triggered by an event'.format(self.name))
threading.Thread(target=_thread_func,
name='Event-' + self.name,
args=(result,)).start()
threading.Thread(target=_thread_func, name='Event-' + self.name, args=(result,)).start()
def hook(f, event_type=Event, **condition):
f.hook = True
f.condition = EventCondition(type=event_type, **condition)
@wraps(f)
def _execute_hook(*args, **kwargs):
from platypush import Response
try:
ret = f(*args, **kwargs)
if isinstance(ret, Response):
return ret
return Response(output=ret)
except Exception as e:
return Response(errors=[str(e)])
return _execute_hook
# vim:sw=4:ts=4:et:

View file

@ -12,7 +12,8 @@ from platypush.config import Config
from platypush.context import get_plugin
from platypush.message import Message
from platypush.message.response import Response
from platypush.utils import get_hash, get_module_and_method_from_action, get_redis_queue_name_by_message
from platypush.utils import get_hash, get_module_and_method_from_action, get_redis_queue_name_by_message, \
is_functional_procedure
logger = logging.getLogger(__name__)
@ -69,8 +70,19 @@ class Request(Message):
from platypush.procedure import Procedure
logger.info('Executing procedure request: {}'.format(self.action))
proc_name = self.action.split('.')[-1]
proc_config = Config.get_procedures()[proc_name]
procedures = Config.get_procedures()
proc_name = '.'.join(self.action.split('.')[1:])
if proc_name not in procedures:
proc_name = self.action.split('.')[-1]
proc_config = procedures[proc_name]
if is_functional_procedure(proc_config):
kwargs.update(**self.args)
if 'n_tries' in kwargs:
del kwargs['n_tries']
return proc_config(*args, **kwargs)
proc = Procedure.build(name=proc_name, requests=proc_config['actions'],
_async=proc_config['_async'], args=self.args,
backend=self.backend, id=self.id)

View file

@ -1,6 +1,7 @@
import enum
import logging
import re
from functools import wraps
from queue import LifoQueue
from ..config import Config
@ -179,6 +180,10 @@ class Procedure(object):
token = Config.get('token')
for request in self.requests:
if callable(request):
response = request(**context)
continue
if isinstance(request, Statement):
if request == Statement.RETURN:
self._should_return = True
@ -461,4 +466,21 @@ class IfProcedure(Procedure):
return response
def procedure(f):
f.procedure = True
@wraps(f)
def _execute_procedure(*args, **kwargs):
try:
ret = f(*args, **kwargs)
if isinstance(ret, Response):
return ret
return Response(output=ret)
except Exception as e:
return Response(errors=[str(e)])
return _execute_procedure
# vim:sw=4:ts=4:et:

View file

@ -114,7 +114,6 @@ def get_decorators(cls, climb_class_hierarchy=False):
def visit_FunctionDef(node):
for n in node.decorator_list:
name = ''
if isinstance(n, ast.Call):
name = n.func.attr if isinstance(n.func, ast.Attribute) else n.func.id
else:
@ -283,4 +282,20 @@ def grouper(n, iterable, fillvalue=None):
yield filter(None, chunk)
def is_functional_procedure(obj) -> bool:
return callable(obj) and hasattr(obj, 'procedure')
def is_functional_hook(obj) -> bool:
return callable(obj) and hasattr(obj, 'hook')
def run(action, **kwargs):
from platypush.context import get_plugin
(module_name, method_name) = get_module_and_method_from_action(action)
plugin = get_plugin(module_name)
method = getattr(plugin, method_name)
return method(**kwargs)
# vim:sw=4:ts=4:et: