diff --git a/docs/source/conf.py b/docs/source/conf.py index 016f9a5fc0..18126f9819 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -225,6 +225,7 @@ autodoc_mock_imports = ['googlesamples.assistant.grpc.audio_helpers', 'picamera', 'pwm3901', 'PIL', + 'croniter', ] sys.path.insert(0, os.path.abspath('../..')) diff --git a/platypush/cron/scheduler.py b/platypush/cron/scheduler.py index 5b67b7da7a..121aec5f3b 100644 --- a/platypush/cron/scheduler.py +++ b/platypush/cron/scheduler.py @@ -1,8 +1,9 @@ -import datetime +import enum import logging -import re import time +import croniter + from threading import Thread from platypush.procedure import Procedure @@ -10,76 +11,79 @@ from platypush.procedure import Procedure logger = logging.getLogger(__name__) +class CronjobState(enum.IntEnum): + IDLE = 0 + WAIT = 1 + RUNNING = 2 + DONE = 3 + ERROR = 4 + + class Cronjob(Thread): - def __init__(self, name, cron_expression, actions, *args, **kwargs): + def __init__(self, name, cron_expression, actions): super().__init__() self.cron_expression = cron_expression self.name = name - self.actions = Procedure.build(name=name+'__Cron', _async=False, + self.state = CronjobState.IDLE + self.actions = Procedure.build(name=name + '__Cron', _async=False, requests=actions) - def run(self): - logger.info('Running cronjob {}'.format(self.name)) - response = None - context = {} - response = self.actions.execute(_async=False, **context) - logger.info('Response from cronjob {}: {}'.format(self.name, response)) + self.state = CronjobState.WAIT + self.wait() + self.state = CronjobState.RUNNING + try: + logger.info('Running cronjob {}'.format(self.name)) + context = {} + response = self.actions.execute(_async=False, **context) + logger.info('Response from cronjob {}: {}'.format(self.name, response)) + self.state = CronjobState.DONE + except Exception as e: + logger.exception(e) + self.state = CronjobState.ERROR + + def wait(self): + now = int(time.time()) + cron = croniter.croniter(self.cron_expression, now) + next_run = int(cron.get_next()) + time.sleep(next_run - now) def should_run(self): - units = ('minute', 'hour', 'day', 'month', 'year') - now = datetime.datetime.fromtimestamp(time.time()) - cron_units = re.split('\s+', self.cron_expression) - - for i in range(0, len(units)): - unit = units[i] - now_unit = getattr(now, unit) - cron_unit = cron_units[i].replace('*', str(now_unit)) - m = re.match('(\d+)(/(\d+))?', cron_unit) - - if m.group(3): - if int(m.group(1)) % int(m.group(3)): - return False - elif m: - if int(m.group(1)) != now_unit: - return False - else: - raise RuntimeError('Invalid cron expression for job {}: {}'. - format(self.name, self.cron_expression)) - - return True + now = int(time.time()) + cron = croniter.croniter(self.cron_expression, now) + next_run = int(cron.get_next()) + return now == next_run class CronScheduler(Thread): - def __init__(self, jobs, *args, **kwargs): + def __init__(self, jobs): super().__init__() self.jobs_config = jobs - logger.info('Cron scheduler initialized with {} jobs' - .format(len(self.jobs_config.keys()))) + self._jobs = {} + logger.info('Cron scheduler initialized with {} jobs'. + format(len(self.jobs_config.keys()))) + def _get_job(self, name, config): + job = self._jobs.get(name) + if job and job.state not in [CronjobState.DONE, CronjobState.ERROR]: + return job - @classmethod - def _build_job(cls, name, config): - if isinstance(config, dict): - job = Cronjob(name=name, cron_expression=config['cron_expression'], - actions=config['actions']) - - assert isinstance(job, Cronjob) - return job + self._jobs[name] = Cronjob(name=name, cron_expression=config['cron_expression'], + actions=config['actions']) + return self._jobs[name] def run(self): logger.info('Running cron scheduler') while True: for (job_name, job_config) in self.jobs_config.items(): - job = self._build_job(name=job_name, config=job_config) - if job.should_run(): + job = self._get_job(name=job_name, config=job_config) + if job.state == CronjobState.IDLE: job.start() - time.sleep(60) + time.sleep(0.5) # vim:sw=4:ts=4:et: - diff --git a/requirements.txt b/requirements.txt index 8c9539807f..cd3e35bfe5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -172,3 +172,6 @@ pyScss # Support for machine learning CV plugin # cv2 # numpy + +# Support for cronjobs +croniter diff --git a/setup.py b/setup.py index e956e8c67e..ffe714b3c8 100755 --- a/setup.py +++ b/setup.py @@ -133,6 +133,7 @@ setup( 'pyyaml', 'redis', 'requests', + 'croniter', ], extras_require = { 'Support for custom thread and process names': ['python-prctl'],