forked from platypush/platypush
Support for Python cronjobs in scripts folder - closes #156
This commit is contained in:
parent
37e006d86e
commit
1c84659e34
4 changed files with 63 additions and 6 deletions
|
@ -12,7 +12,7 @@ from typing import Optional
|
||||||
|
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from platypush.utils import get_hash, is_functional_procedure, is_functional_hook
|
from platypush.utils import get_hash, is_functional_procedure, is_functional_hook, is_functional_cron
|
||||||
|
|
||||||
""" Config singleton instance """
|
""" Config singleton instance """
|
||||||
_default_config_instance = None
|
_default_config_instance = None
|
||||||
|
@ -206,6 +206,12 @@ class Config(object):
|
||||||
if is_functional_hook(obj)
|
if is_functional_hook(obj)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
self.cronjobs.update(**{
|
||||||
|
prefix + name: obj
|
||||||
|
for name, obj in inspect.getmembers(module)
|
||||||
|
if is_functional_cron(obj)
|
||||||
|
})
|
||||||
|
|
||||||
def _load_scripts(self):
|
def _load_scripts(self):
|
||||||
scripts_dir = self._config['scripts_dir']
|
scripts_dir = self._config['scripts_dir']
|
||||||
sys_path = sys.path.copy()
|
sys_path = sys.path.copy()
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
from functools import wraps
|
||||||
|
from logging import getLogger
|
||||||
|
|
||||||
|
logger = getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def cron(cron_expression: str):
|
||||||
|
def wrapper(f):
|
||||||
|
f.cron = True
|
||||||
|
f.cron_expression = cron_expression
|
||||||
|
|
||||||
|
@wraps(f)
|
||||||
|
def wrapped(*args, **kwargs):
|
||||||
|
from platypush import Response
|
||||||
|
|
||||||
|
try:
|
||||||
|
ret = f(*args, **kwargs)
|
||||||
|
if isinstance(ret, Response):
|
||||||
|
return ret
|
||||||
|
|
||||||
|
return Response(output=ret)
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(e)
|
||||||
|
return Response(errors=[str(e)])
|
||||||
|
|
||||||
|
return wrapped
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
# vim:sw=4:ts=4:et:
|
|
@ -7,6 +7,7 @@ import croniter
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
|
||||||
from platypush.procedure import Procedure
|
from platypush.procedure import Procedure
|
||||||
|
from platypush.utils import is_functional_cron
|
||||||
|
|
||||||
logger = logging.getLogger('platypush:cron')
|
logger = logging.getLogger('platypush:cron')
|
||||||
|
|
||||||
|
@ -25,8 +26,11 @@ class Cronjob(Thread):
|
||||||
self.cron_expression = cron_expression
|
self.cron_expression = cron_expression
|
||||||
self.name = name
|
self.name = name
|
||||||
self.state = CronjobState.IDLE
|
self.state = CronjobState.IDLE
|
||||||
self.actions = Procedure.build(name=name + '__Cron', _async=False,
|
|
||||||
requests=actions)
|
if isinstance(actions, dict):
|
||||||
|
self.actions = Procedure.build(name=name + '__Cron', _async=False, requests=actions)
|
||||||
|
else:
|
||||||
|
self.actions = actions
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.state = CronjobState.WAIT
|
self.state = CronjobState.WAIT
|
||||||
|
@ -36,7 +40,12 @@ class Cronjob(Thread):
|
||||||
try:
|
try:
|
||||||
logger.info('Running cronjob {}'.format(self.name))
|
logger.info('Running cronjob {}'.format(self.name))
|
||||||
context = {}
|
context = {}
|
||||||
response = self.actions.execute(_async=False, **context)
|
|
||||||
|
if isinstance(self.actions, Procedure):
|
||||||
|
response = self.actions.execute(_async=False, **context)
|
||||||
|
else:
|
||||||
|
response = self.actions(**context)
|
||||||
|
|
||||||
logger.info('Response from cronjob {}: {}'.format(self.name, response))
|
logger.info('Response from cronjob {}: {}'.format(self.name, response))
|
||||||
self.state = CronjobState.DONE
|
self.state = CronjobState.DONE
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -69,8 +78,15 @@ class CronScheduler(Thread):
|
||||||
if job and job.state not in [CronjobState.DONE, CronjobState.ERROR]:
|
if job and job.state not in [CronjobState.DONE, CronjobState.ERROR]:
|
||||||
return job
|
return job
|
||||||
|
|
||||||
self._jobs[name] = Cronjob(name=name, cron_expression=config['cron_expression'],
|
if isinstance(config, dict):
|
||||||
actions=config['actions'])
|
self._jobs[name] = Cronjob(name=name, cron_expression=config['cron_expression'],
|
||||||
|
actions=config['actions'])
|
||||||
|
elif is_functional_cron(config):
|
||||||
|
self._jobs[name] = Cronjob(name=name, cron_expression=config.cron_expression,
|
||||||
|
actions=config)
|
||||||
|
else:
|
||||||
|
raise AssertionError('Expected type dict or function for cron {}, got {}'.format(
|
||||||
|
name, type(config)))
|
||||||
|
|
||||||
return self._jobs[name]
|
return self._jobs[name]
|
||||||
|
|
||||||
|
|
|
@ -325,6 +325,10 @@ def is_functional_hook(obj) -> bool:
|
||||||
return callable(obj) and hasattr(obj, 'hook')
|
return callable(obj) and hasattr(obj, 'hook')
|
||||||
|
|
||||||
|
|
||||||
|
def is_functional_cron(obj) -> bool:
|
||||||
|
return callable(obj) and hasattr(obj, 'cron') and hasattr(obj, 'cron_expression')
|
||||||
|
|
||||||
|
|
||||||
def run(action, *args, **kwargs):
|
def run(action, *args, **kwargs):
|
||||||
from platypush.context import get_plugin
|
from platypush.context import get_plugin
|
||||||
(module_name, method_name) = get_module_and_method_from_action(action)
|
(module_name, method_name) = get_module_and_method_from_action(action)
|
||||||
|
|
Loading…
Reference in a new issue