From 14b511034f3266336d554205e98ad92f2ee5686f Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 15 Jan 2018 22:36:24 +0100 Subject: [PATCH] Support for cron actions, solves #47 --- platypush/__init__.py | 4 ++ platypush/config/__init__.py | 18 +++++- platypush/cron/__init__.py | 0 platypush/cron/scheduler.py | 86 +++++++++++++++++++++++++++ platypush/message/request/__init__.py | 6 ++ 5 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 platypush/cron/__init__.py create mode 100644 platypush/cron/scheduler.py diff --git a/platypush/__init__.py b/platypush/__init__.py index e646dc88..97bd68b7 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -8,6 +8,7 @@ from threading import Thread from .bus import Bus from .config import Config from .context import register_backends +from .cron.scheduler import CronScheduler from .event.processor import EventProcessor from .message.event import Event, StopEvent from .message.request import Request @@ -110,6 +111,9 @@ class Daemon(object): for backend in self.backends.values(): backend.start() + # Start the cron scheduler + CronScheduler(jobs=Config.get_cronjobs()).start() + # Poll for messages on the bus try: self.bus.poll() diff --git a/platypush/config/__init__.py b/platypush/config/__init__.py index b02ab686..359bc494 100644 --- a/platypush/config/__init__.py +++ b/platypush/config/__init__.py @@ -34,8 +34,8 @@ class Config(object): ] _default_constants = { - 'today': datetime.date.today().isoformat, - 'now': datetime.datetime.now().isoformat, + 'today': datetime.date.today, + 'now': datetime.datetime.now, } _workdir_location = os.path.join(os.environ['HOME'], '.local', 'share', 'platypush') @@ -76,8 +76,10 @@ class Config(object): self.event_hooks = {} self.procedures = {} self.constants = {} + self.cronjobs = [] self._init_constants() + self._init_cronjobs() self._init_components() @@ -133,6 +135,12 @@ class Config(object): self.constants[key] = value + def _init_cronjobs(self): + if 'cron' in self._config: + for job in self._config['cron']['jobs']: + self.cronjobs.append(job) + + @staticmethod def get_backends(): global _default_config_instance @@ -176,6 +184,12 @@ class Config(object): value = _default_config_instance.constants[name] return value() if callable(value) else value + @staticmethod + def get_cronjobs(): + global _default_config_instance + if _default_config_instance is None: _default_config_instance = Config() + return _default_config_instance.cronjobs + @staticmethod def get_default_pusher_backend(): """ diff --git a/platypush/cron/__init__.py b/platypush/cron/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/platypush/cron/scheduler.py b/platypush/cron/scheduler.py new file mode 100644 index 00000000..bc21be32 --- /dev/null +++ b/platypush/cron/scheduler.py @@ -0,0 +1,86 @@ +import datetime +import logging +import re +import time + +from threading import Thread + +from platypush.event.hook import EventAction + +class Cronjob(Thread): + def __init__(self, name, cron_expression, actions, *args, **kwargs): + super().__init__() + self.cron_expression = cron_expression + self.actions = [] + + for action in actions: + self.actions.append(EventAction.build(action)) + + + def run(self): + logging.info('Running cronjob {}'.format(self.name)) + response = None + context = {} + + for action in self.actions: + response = action.execute(async=False, **context) + logging.info('Response from cronjob {}: {}'.format(self.name, response)) + + + def should_run(self): + units = ('minute', 'hour', 'day', 'month', 'year') + now = datetime.datetime.fromtimestamp(time.time()) + cron_units = re.split('\s+', self.cron_expression) + + for i in range(0, len(units)): + unit = units[i] + now_unit = getattr(now, unit) + cron_unit = cron_units[i].replace('*', str(now_unit)) + m = re.match('(\d+)(/(\d+))?', cron_unit) + + if m.group(3): + if int(m.group(1)) % int(m.group(3)): + return False + elif m: + if int(m.group(1)) != now_unit: + return False + else: + raise RuntimeError('Invalid cron expression for job {}: {}'. + format(self.name, self.cron_expression)) + + return True + + +class CronScheduler(Thread): + def __init__(self, jobs, *args, **kwargs): + super().__init__() + self.jobs_config = jobs + logging.info('Cron scheduler initialized with {} jobs' + .format(len(self.jobs_config))) + + + @classmethod + def _build_job(cls, job_config): + if isinstance(job_config, dict): + job = Cronjob(cron_expression=job_config['cron_expression'], + name=job_config['name'], + actions=job_config['actions']) + + assert isinstance(job, Cronjob) + return job + + + def run(self): + logging.info('Running cron scheduler') + + while True: + for job_config in self.jobs_config: + job = self._build_job(job_config) + if job.should_run(): + job.start() + + time.sleep(60) + + +# vim:sw=4:ts=4:et: + diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index bc8d7bea..fb2325d9 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -1,4 +1,5 @@ import copy +import datetime import json import logging import random @@ -114,6 +115,11 @@ class Request(Message): try: context_value = eval("context['{}']{}".format( context_argname, path if path else '')) + + if callable(context_value): + context_value = context_value() + if isinstance(context_value, datetime.date): + context_value = context_value.isoformat() except: context_value = expr parsed_value += prefix + (