diff --git a/platypush/cron/scheduler.py b/platypush/cron/scheduler.py index 735955bb4..74eaaae5e 100644 --- a/platypush/cron/scheduler.py +++ b/platypush/cron/scheduler.py @@ -7,11 +7,20 @@ import croniter from dateutil.tz import gettz from platypush.procedure import Procedure -from platypush.utils import is_functional_cron +from platypush.utils import is_functional_cron, set_thread_name logger = logging.getLogger('platypush:cron') +def get_now() -> datetime.datetime: + """ + :return: A timezone-aware representation of `now` + """ + return datetime.datetime.now().replace( + tzinfo=gettz() + ) # lgtm [py/call-to-non-callable] + + class CronjobState(enum.IntEnum): IDLE = 0 WAIT = 1 @@ -20,21 +29,36 @@ class CronjobState(enum.IntEnum): ERROR = 4 +class CronjobEvent(enum.IntEnum): + NONE = 0 + STOP = 1 + TIME_SYNC = 2 + + class Cronjob(threading.Thread): def __init__(self, name, cron_expression, actions): super().__init__() self.cron_expression = cron_expression self.name = name self.state = CronjobState.IDLE - self._should_stop = threading.Event() + self._event = threading.Event() + self._event_type = CronjobEvent.NONE + self._event_lock = threading.RLock() - if isinstance(actions, dict) or isinstance(actions, list): - self.actions = Procedure.build(name=name + '__Cron', _async=False, requests=actions) + if isinstance(actions, (list, dict)): + self.actions = Procedure.build( + name=name + '__Cron', _async=False, requests=actions + ) else: self.actions = actions + def notify(self, event: CronjobEvent): + with self._event_lock: + self._event_type = event + self._event.set() + def run(self): - self.state = CronjobState.WAIT + set_thread_name(f'cron:{self.name}') self.wait() if self.should_stop(): return @@ -57,26 +81,38 @@ class Cronjob(threading.Thread): self.state = CronjobState.ERROR def wait(self): - now = datetime.datetime.now().replace(tzinfo=gettz()) # lgtm [py/call-to-non-callable] - cron = croniter.croniter(self.cron_expression, now) - next_run = cron.get_next() - self._should_stop.wait(next_run - now.timestamp()) + with self._event_lock: + self.state = CronjobState.WAIT + self._event.clear() + self._event_type = CronjobEvent.TIME_SYNC + + while self._event_type == CronjobEvent.TIME_SYNC: + now = get_now() + self._event_type = CronjobEvent.NONE + cron = croniter.croniter(self.cron_expression, now) + next_run = cron.get_next() + self._event.wait(max(0, next_run - now.timestamp())) def stop(self): - self._should_stop.set() + self._event_type = CronjobEvent.STOP + self._event.set() def should_stop(self): - return self._should_stop.is_set() + return self._event_type == CronjobEvent.STOP class CronScheduler(threading.Thread): - def __init__(self, jobs): + def __init__(self, jobs, poll_seconds: float = 0.5): super().__init__() self.jobs_config = jobs self._jobs = {} + self._poll_seconds = max(1e-3, poll_seconds) self._should_stop = threading.Event() - logger.info('Cron scheduler initialized with {} jobs'. - format(len(self.jobs_config.keys()))) + logger.info( + 'Cron scheduler initialized with {} jobs'.format( + len(self.jobs_config.keys()) + ) + ) def _get_job(self, name, config): job = self._jobs.get(name) @@ -84,14 +120,21 @@ class CronScheduler(threading.Thread): return job if isinstance(config, dict): - self._jobs[name] = Cronjob(name=name, cron_expression=config['cron_expression'], - actions=config['actions']) + self._jobs[name] = Cronjob( + name=name, + cron_expression=config['cron_expression'], + actions=config['actions'], + ) elif is_functional_cron(config): - self._jobs[name] = Cronjob(name=name, cron_expression=config.cron_expression, - actions=config) + self._jobs[name] = Cronjob( + name=name, cron_expression=config.cron_expression, actions=config + ) else: - raise AssertionError('Expected type dict or function for cron {}, got {}'.format( - name, type(config))) + raise AssertionError( + 'Expected type dict or function for cron {}, got {}'.format( + name, type(config) + ) + ) return self._jobs[name] @@ -112,7 +155,22 @@ class CronScheduler(threading.Thread): if job.state == CronjobState.IDLE: job.start() - self._should_stop.wait(timeout=0.5) + t_before_wait = get_now().timestamp() + self._should_stop.wait(timeout=self._poll_seconds) + t_after_wait = get_now().timestamp() + time_drift = abs(t_after_wait - t_before_wait) - self._poll_seconds + + if not self.should_stop() and time_drift > 1: + # If the system clock has been adjusted by more than one second + # (e.g. because of DST change or NTP sync) then ensure that the + # registered cronjobs are synchronized with the new datetime + logger.info( + 'System clock drift detected: %f secs. Synchronizing the cronjobs', + time_drift, + ) + + for job in self._jobs.values(): + job.notify(CronjobEvent.TIME_SYNC) logger.info('Terminating cron scheduler') diff --git a/tests/etc/scripts/test_cron.py b/tests/etc/scripts/test_cron.py index 6618dcb3f..172dc363d 100644 --- a/tests/etc/scripts/test_cron.py +++ b/tests/etc/scripts/test_cron.py @@ -2,25 +2,34 @@ import datetime from platypush.cron import cron -from tests.test_cron import tmp_files, tmp_files_ready, \ - test_timeout, expected_cron_file_content +from tests.test_cron import test_timeout, cron_queue + + +def make_cron_expr(cron_time: datetime.datetime): + return '{min} {hour} {day} {month} * {sec}'.format( + min=cron_time.minute, + hour=cron_time.hour, + day=cron_time.day, + month=cron_time.month, + sec=cron_time.second, + ) + # Prepare a cronjob that should start test_timeout/2 seconds from the application start -cron_time = datetime.datetime.now() + datetime.timedelta(seconds=test_timeout/2) -cron_expr = '{min} {hour} {day} {month} * {sec}'.format( - min=cron_time.minute, hour=cron_time.hour, day=cron_time.day, - month=cron_time.month, sec=cron_time.second) +cron_time = datetime.datetime.now() + datetime.timedelta(seconds=test_timeout / 2) -@cron(cron_expr) +@cron(make_cron_expr(cron_time)) def cron_test(**_): - """ - Simple cronjob that awaits for ``../test_cron.py`` to be ready and writes the expected - content to the monitored temporary file. - """ - files_ready = tmp_files_ready.wait(timeout=test_timeout) - assert files_ready, \ - 'The test did not prepare the temporary files within {} seconds'.format(test_timeout) + cron_queue.put('cron_test') - with open(tmp_files[0], 'w') as f: - f.write(expected_cron_file_content) + +# Prepare another cronjob that should start 1hr + test_timeout/2 seconds from the application start +cron_time = datetime.datetime.now() + datetime.timedelta( + hours=1, seconds=test_timeout / 2 +) + + +@cron(make_cron_expr(cron_time)) +def cron_1hr_test(**_): + cron_queue.put('cron_1hr_test') diff --git a/tests/test_cron.py b/tests/test_cron.py index c7a76a873..5bc41e636 100644 --- a/tests/test_cron.py +++ b/tests/test_cron.py @@ -1,43 +1,61 @@ -import os +import datetime +import queue import pytest -import tempfile -import threading import time -tmp_files = [] -tmp_files_ready = threading.Event() +from dateutil.tz import gettz +from mock import patch + test_timeout = 10 -expected_cron_file_content = 'The cronjob ran successfully!' +cron_queue = queue.Queue() -@pytest.fixture(scope='module', autouse=True) -def tmp_file(*_): - tmp_file = tempfile.NamedTemporaryFile(prefix='platypush-test-cron-', - suffix='.txt', delete=False) - tmp_files.append(tmp_file.name) - tmp_files_ready.set() - yield tmp_file.name +class MockDatetime(datetime.datetime): + timedelta = datetime.timedelta() - for f in tmp_files: - if os.path.isfile(f): - os.unlink(f) + @classmethod + def now(cls): + return super().now(tz=gettz()) + cls.timedelta -def test_cron_execution(tmp_file): +def _test_cron_queue(expected_msg: str): + msg = None + test_start = time.time() + while time.time() - test_start <= test_timeout and msg != expected_msg: + try: + msg = cron_queue.get(block=True, timeout=test_timeout) + except queue.Empty: + break + + assert msg == expected_msg, 'The expected cronjob has not been executed' + + +def test_cron_execution(): """ Test that the cronjob in ``../etc/scripts/test_cron.py`` runs successfully. """ - actual_cron_file_content = None - test_start = time.time() + _test_cron_queue('cron_test') - while actual_cron_file_content != expected_cron_file_content and \ - time.time() - test_start < test_timeout: - with open(tmp_file, 'r') as f: - actual_cron_file_content = f.read() - time.sleep(0.5) - assert actual_cron_file_content == expected_cron_file_content, \ - 'cron_test failed to run within {} seconds'.format(test_timeout) +def test_cron_execution_upon_system_clock_change(): + """ + Test that the cronjob runs at the right time even upon DST or other + system clock changes. + """ + # Mock datetime.datetime with a class that has overridable timedelta + patcher = patch('datetime.datetime', MockDatetime) + + try: + patcher.start() + time.sleep(1) + # Simulate a +1hr shift on the system clock + MockDatetime.timedelta = datetime.timedelta(hours=1) + time.sleep(1) + finally: + patcher.stop() + + # Ensure that the cronjob that was supposed to run in an hour is now running + _test_cron_queue('cron_1hr_test') if __name__ == '__main__':