From d0d333e8f4679e883e5c0e044b4ee669cec7aab6 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 20 Dec 2022 23:01:03 +0100 Subject: [PATCH] FIX: Clear the cronjob event after receiving a TIME_SYNC. When a cronjob receives a TIME_SYNC event (because the system clock has changed/drifted and the cronjobs are expected to recalculate their next run slot) it should also clear the event. Otherwise, the next `wait` will be skipped and the cronjob will be executed even if it wasn't scheduled. --- platypush/cron/scheduler.py | 73 ++++++++++++++++++++++++++++++++----- 1 file changed, 63 insertions(+), 10 deletions(-) diff --git a/platypush/cron/scheduler.py b/platypush/cron/scheduler.py index 18f1ee8b..445874aa 100644 --- a/platypush/cron/scheduler.py +++ b/platypush/cron/scheduler.py @@ -2,6 +2,7 @@ import datetime import enum import logging import threading +from typing import Dict import croniter from dateutil.tz import gettz @@ -22,6 +23,10 @@ def get_now() -> datetime.datetime: class CronjobState(enum.IntEnum): + """ + An enum used to model the possible states of a cronjob. + """ + IDLE = 0 WAIT = 1 RUNNING = 2 @@ -30,12 +35,22 @@ class CronjobState(enum.IntEnum): class CronjobEvent(enum.IntEnum): + """ + A list of events used to synchronize with a cronjob. + """ + NONE = 0 STOP = 1 TIME_SYNC = 2 class Cronjob(threading.Thread): + """ + Representation of a cronjob. The inner logic is wrapped by a thread that + waits until the next execution slot, and can quickly synchronize in case of + clock change/drift. + """ + def __init__(self, name, cron_expression, actions): super().__init__() self.cron_expression = cron_expression @@ -53,13 +68,23 @@ class Cronjob(threading.Thread): self.actions = actions def notify(self, event: CronjobEvent): + """ + Send an event to this cronjob. + """ with self._event_lock: self._event_type = event self._event.set() def run(self): + """ + Inner logic of the cronjob thread. + """ set_thread_name(f'cron:{self.name}') + + # Wait until an event is received or the next execution slot is reached self.wait() + + # Early exit if we received a stop event if self.should_stop(): return @@ -70,8 +95,10 @@ class Cronjob(threading.Thread): context = {} if isinstance(self.actions, Procedure): + # If the cronjob wraps a procedure, execute it response = self.actions.execute(_async=False, **context) else: + # Otherwise, execute the scheduled actions one by one response = self.actions(**context) logger.info('Response from cronjob {}: {}'.format(self.name, response)) @@ -81,31 +108,48 @@ class Cronjob(threading.Thread): self.state = CronjobState.ERROR def wait(self): + """ + Wait until the next slot is reached. + """ + # Set the cronjob in WAIT state with self._event_lock: self.state = CronjobState.WAIT self._event.clear() self._event_type = CronjobEvent.TIME_SYNC + # Keep iterating until it's our time to run. If we receive clock + # synchronization events, the cronjob updates its next expected run and + # keeps waiting. while self._event_type == CronjobEvent.TIME_SYNC: - now = get_now() self._event_type = CronjobEvent.NONE - cron = croniter.croniter(self.cron_expression, now) - next_run = cron.get_next(datetime.datetime) - self._event.wait(max(0, (next_run - now).total_seconds())) + next_run = self._get_next_run_secs() + self._event.wait(next_run) - def stop(self): - self._event_type = CronjobEvent.STOP - self._event.set() + with self._event_lock: + self._event.clear() + + def _get_next_run_secs(self) -> int: + """ + Get the number of seconds between now and the next scheduled run. + """ + now = get_now() + cron = croniter.croniter(self.cron_expression, now) + next_run = cron.get_next(datetime.datetime) + return max(0, (next_run - now).total_seconds()) def should_stop(self): return self._event_type == CronjobEvent.STOP class CronScheduler(threading.Thread): + """ + Main cron scheduler job. + """ + def __init__(self, jobs, poll_seconds: float = 0.5): super().__init__() self.jobs_config = jobs - self._jobs = {} + self._jobs: Dict[str, Cronjob] = {} self._poll_seconds = max(1e-3, poll_seconds) self._should_stop = threading.Event() logger.info( @@ -114,18 +158,24 @@ class CronScheduler(threading.Thread): ) ) - def _get_job(self, name, config): + def _get_job(self, name, config) -> Cronjob: + """ + Get a cronjob by name. + """ + # Check if the cronjob has already been indexed. job = self._jobs.get(name) if job and job.state not in [CronjobState.DONE, CronjobState.ERROR]: return job if isinstance(config, dict): + # If the cronjob is a static list of actions, initialize it from dict self._jobs[name] = Cronjob( name=name, cron_expression=config['cron_expression'], actions=config['actions'], ) elif is_functional_cron(config): + # Otherwise, initialize it as a native Python function self._jobs[name] = Cronjob( name=name, cron_expression=config.cron_expression, actions=config ) @@ -139,8 +189,11 @@ class CronScheduler(threading.Thread): return self._jobs[name] def stop(self): + """ + Stop the scheduler and send a STOP signal to all the registered cronjobs. + """ for job in self._jobs.values(): - job.stop() + job.notify(CronjobEvent.STOP) self._should_stop.set() def should_stop(self):