From d0d333e8f4679e883e5c0e044b4ee669cec7aab6 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 20 Dec 2022 23:01:03 +0100 Subject: [PATCH 1/2] FIX: Clear the cronjob event after receiving a TIME_SYNC. When a cronjob receives a TIME_SYNC event (because the system clock has changed/drifted and the cronjobs are expected to recalculate their next run slot) it should also clear the event. Otherwise, the next `wait` will be skipped and the cronjob will be executed even if it wasn't scheduled. --- platypush/cron/scheduler.py | 73 ++++++++++++++++++++++++++++++++----- 1 file changed, 63 insertions(+), 10 deletions(-) diff --git a/platypush/cron/scheduler.py b/platypush/cron/scheduler.py index 18f1ee8bd..445874aae 100644 --- a/platypush/cron/scheduler.py +++ b/platypush/cron/scheduler.py @@ -2,6 +2,7 @@ import datetime import enum import logging import threading +from typing import Dict import croniter from dateutil.tz import gettz @@ -22,6 +23,10 @@ def get_now() -> datetime.datetime: class CronjobState(enum.IntEnum): + """ + An enum used to model the possible states of a cronjob. + """ + IDLE = 0 WAIT = 1 RUNNING = 2 @@ -30,12 +35,22 @@ class CronjobState(enum.IntEnum): class CronjobEvent(enum.IntEnum): + """ + A list of events used to synchronize with a cronjob. + """ + NONE = 0 STOP = 1 TIME_SYNC = 2 class Cronjob(threading.Thread): + """ + Representation of a cronjob. The inner logic is wrapped by a thread that + waits until the next execution slot, and can quickly synchronize in case of + clock change/drift. + """ + def __init__(self, name, cron_expression, actions): super().__init__() self.cron_expression = cron_expression @@ -53,13 +68,23 @@ class Cronjob(threading.Thread): self.actions = actions def notify(self, event: CronjobEvent): + """ + Send an event to this cronjob. + """ with self._event_lock: self._event_type = event self._event.set() def run(self): + """ + Inner logic of the cronjob thread. + """ set_thread_name(f'cron:{self.name}') + + # Wait until an event is received or the next execution slot is reached self.wait() + + # Early exit if we received a stop event if self.should_stop(): return @@ -70,8 +95,10 @@ class Cronjob(threading.Thread): context = {} if isinstance(self.actions, Procedure): + # If the cronjob wraps a procedure, execute it response = self.actions.execute(_async=False, **context) else: + # Otherwise, execute the scheduled actions one by one response = self.actions(**context) logger.info('Response from cronjob {}: {}'.format(self.name, response)) @@ -81,31 +108,48 @@ class Cronjob(threading.Thread): self.state = CronjobState.ERROR def wait(self): + """ + Wait until the next slot is reached. + """ + # Set the cronjob in WAIT state with self._event_lock: self.state = CronjobState.WAIT self._event.clear() self._event_type = CronjobEvent.TIME_SYNC + # Keep iterating until it's our time to run. If we receive clock + # synchronization events, the cronjob updates its next expected run and + # keeps waiting. while self._event_type == CronjobEvent.TIME_SYNC: - now = get_now() self._event_type = CronjobEvent.NONE - cron = croniter.croniter(self.cron_expression, now) - next_run = cron.get_next(datetime.datetime) - self._event.wait(max(0, (next_run - now).total_seconds())) + next_run = self._get_next_run_secs() + self._event.wait(next_run) - def stop(self): - self._event_type = CronjobEvent.STOP - self._event.set() + with self._event_lock: + self._event.clear() + + def _get_next_run_secs(self) -> int: + """ + Get the number of seconds between now and the next scheduled run. + """ + now = get_now() + cron = croniter.croniter(self.cron_expression, now) + next_run = cron.get_next(datetime.datetime) + return max(0, (next_run - now).total_seconds()) def should_stop(self): return self._event_type == CronjobEvent.STOP class CronScheduler(threading.Thread): + """ + Main cron scheduler job. + """ + def __init__(self, jobs, poll_seconds: float = 0.5): super().__init__() self.jobs_config = jobs - self._jobs = {} + self._jobs: Dict[str, Cronjob] = {} self._poll_seconds = max(1e-3, poll_seconds) self._should_stop = threading.Event() logger.info( @@ -114,18 +158,24 @@ class CronScheduler(threading.Thread): ) ) - def _get_job(self, name, config): + def _get_job(self, name, config) -> Cronjob: + """ + Get a cronjob by name. + """ + # Check if the cronjob has already been indexed. job = self._jobs.get(name) if job and job.state not in [CronjobState.DONE, CronjobState.ERROR]: return job if isinstance(config, dict): + # If the cronjob is a static list of actions, initialize it from dict self._jobs[name] = Cronjob( name=name, cron_expression=config['cron_expression'], actions=config['actions'], ) elif is_functional_cron(config): + # Otherwise, initialize it as a native Python function self._jobs[name] = Cronjob( name=name, cron_expression=config.cron_expression, actions=config ) @@ -139,8 +189,11 @@ class CronScheduler(threading.Thread): return self._jobs[name] def stop(self): + """ + Stop the scheduler and send a STOP signal to all the registered cronjobs. + """ for job in self._jobs.values(): - job.stop() + job.notify(CronjobEvent.STOP) self._should_stop.set() def should_stop(self): From 84ce31cab0ff857d75ccdf7b28c6ab79bbf4e480 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 20 Dec 2022 23:05:22 +0100 Subject: [PATCH 2/2] =?UTF-8?q?Bump=20version:=200.24.3=20=E2=86=92=200.24?= =?UTF-8?q?.4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 7 +++++++ platypush/__init__.py | 2 +- setup.cfg | 2 +- setup.py | 2 +- 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c543f254..e30952909 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file. Given the high speed of development in the first phase, changes are being reported only starting from v0.20.2. +## [0.24.4] - 2022-12-20 + +### Fixed + +- Fixed cronjobs potentially being triggered even if it wasn't their slot in + case of clock synchronization events. + ## [0.24.3] - 2022-12-17 ### Added diff --git a/platypush/__init__.py b/platypush/__init__.py index bb9970e85..e521363e8 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -23,7 +23,7 @@ from .message.response import Response from .utils import set_thread_name, get_enabled_plugins __author__ = 'Fabio Manganiello ' -__version__ = '0.24.3' +__version__ = '0.24.4' logger = logging.getLogger('platypush') diff --git a/setup.cfg b/setup.cfg index 43af87932..03298446c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.24.3 +current_version = 0.24.4 commit = True tag = True diff --git a/setup.py b/setup.py index 7a1efbc42..e90316220 100755 --- a/setup.py +++ b/setup.py @@ -28,7 +28,7 @@ backend = pkg_files('platypush/backend') setup( name="platypush", - version="0.24.3", + version="0.24.4", author="Fabio Manganiello", author_email="info@fabiomanganiello.com", description="Platypush service",