From 820a1c81846fc84ec572615fa2c2cd96f0bfb89d Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 27 Apr 2022 23:25:14 +0200 Subject: [PATCH 1/3] Don't raise a pytest warning upon the asyncio "No event loop" warning --- platypush/context/__init__.py | 25 ++++++++++++++----------- pyproject.toml | 4 ++++ 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/platypush/context/__init__.py b/platypush/context/__init__.py index bcadf289a..0d6876376 100644 --- a/platypush/context/__init__.py +++ b/platypush/context/__init__.py @@ -26,14 +26,14 @@ main_bus = None def register_backends(bus=None, global_scope=False, **kwargs): - """ Initialize the backend objects based on the configuration and returns + """Initialize the backend objects based on the configuration and returns a name -> backend_instance map. Params: bus -- If specific (it usually should), the messages processed by the backends will be posted on this bus. kwargs -- Any additional key-value parameters required to initialize the backends - """ + """ global main_bus if bus: @@ -57,8 +57,7 @@ def register_backends(bus=None, global_scope=False, **kwargs): b = getattr(module, cls_name)(bus=bus, **cfg, **kwargs) backends[name] = b except AttributeError as e: - logger.warning('No such class in {}: {}'.format( - module.__name__, cls_name)) + logger.warning('No such class in {}: {}'.format(module.__name__, cls_name)) raise RuntimeError(e) return backends @@ -74,15 +73,15 @@ def register_plugins(bus=None): def get_backend(name): - """ Returns the backend instance identified by name if it exists """ + """Returns the backend instance identified by name if it exists""" global backends return backends.get(name) def get_plugin(plugin_name, reload=False): - """ Registers a plugin instance by name if not registered already, or - returns the registered plugin instance""" + """Registers a plugin instance by name if not registered already, or + returns the registered plugin instance""" global plugins global plugins_init_locks @@ -104,8 +103,9 @@ def get_plugin(plugin_name, reload=False): cls_name += token.title() cls_name += 'Plugin' - plugin_conf = Config.get_plugins()[plugin_name] \ - if plugin_name in Config.get_plugins() else {} + plugin_conf = ( + Config.get_plugins()[plugin_name] if plugin_name in Config.get_plugins() else {} + ) if 'disabled' in plugin_conf: if plugin_conf['disabled'] is True: @@ -120,7 +120,9 @@ def get_plugin(plugin_name, reload=False): try: plugin_class = getattr(plugin, cls_name) except AttributeError as e: - logger.warning('No such class in {}: {} [error: {}]'.format(plugin_name, cls_name, str(e))) + logger.warning( + 'No such class in {}: {} [error: {}]'.format(plugin_name, cls_name, str(e)) + ) raise RuntimeError(e) with plugins_init_locks[plugin_name]: @@ -137,13 +139,14 @@ def get_bus() -> Bus: return main_bus from platypush.bus.redis import RedisBus + return RedisBus() def get_or_create_event_loop(): try: loop = asyncio.get_event_loop() - except RuntimeError: + except (DeprecationWarning, RuntimeError): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) diff --git a/pyproject.toml b/pyproject.toml index e7c6caf61..a5cd7464e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,3 +2,7 @@ skip-string-normalization = true skip-numeric-underscore-normalization = true +[tool.pytest.ini_options] +filterwarnings = [ + 'ignore:There is no current event loop:DeprecationWarning', +] From 41d0725ebf643c99e888fd112f4440e47b4c9ee7 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 28 Apr 2022 00:57:49 +0200 Subject: [PATCH 2/3] Fix for #217 The cron scheduler has been made more robust against changes in the system clock (caused by e.g. DST changes, NTP syncs or manual setting). A more granular management for cronjob events has been introduced, now supporting a `TIME_SYNC` event besides the usual `STOP`. When the cron scheduler detects a system clock drift (i.e. the timestamp offset before and after a blocking wait is >1 sec) then all the cronjobs are notified and forced to refresh their state. --- platypush/cron/scheduler.py | 100 ++++++++++++++++++++++++++------- tests/etc/scripts/test_cron.py | 41 ++++++++------ tests/test_cron.py | 70 ++++++++++++++--------- 3 files changed, 148 insertions(+), 63 deletions(-) diff --git a/platypush/cron/scheduler.py b/platypush/cron/scheduler.py index 735955bb4..74eaaae5e 100644 --- a/platypush/cron/scheduler.py +++ b/platypush/cron/scheduler.py @@ -7,11 +7,20 @@ import croniter from dateutil.tz import gettz from platypush.procedure import Procedure -from platypush.utils import is_functional_cron +from platypush.utils import is_functional_cron, set_thread_name logger = logging.getLogger('platypush:cron') +def get_now() -> datetime.datetime: + """ + :return: A timezone-aware representation of `now` + """ + return datetime.datetime.now().replace( + tzinfo=gettz() + ) # lgtm [py/call-to-non-callable] + + class CronjobState(enum.IntEnum): IDLE = 0 WAIT = 1 @@ -20,21 +29,36 @@ class CronjobState(enum.IntEnum): ERROR = 4 +class CronjobEvent(enum.IntEnum): + NONE = 0 + STOP = 1 + TIME_SYNC = 2 + + class Cronjob(threading.Thread): def __init__(self, name, cron_expression, actions): super().__init__() self.cron_expression = cron_expression self.name = name self.state = CronjobState.IDLE - self._should_stop = threading.Event() + self._event = threading.Event() + self._event_type = CronjobEvent.NONE + self._event_lock = threading.RLock() - if isinstance(actions, dict) or isinstance(actions, list): - self.actions = Procedure.build(name=name + '__Cron', _async=False, requests=actions) + if isinstance(actions, (list, dict)): + self.actions = Procedure.build( + name=name + '__Cron', _async=False, requests=actions + ) else: self.actions = actions + def notify(self, event: CronjobEvent): + with self._event_lock: + self._event_type = event + self._event.set() + def run(self): - self.state = CronjobState.WAIT + set_thread_name(f'cron:{self.name}') self.wait() if self.should_stop(): return @@ -57,26 +81,38 @@ class Cronjob(threading.Thread): self.state = CronjobState.ERROR def wait(self): - now = datetime.datetime.now().replace(tzinfo=gettz()) # lgtm [py/call-to-non-callable] - cron = croniter.croniter(self.cron_expression, now) - next_run = cron.get_next() - self._should_stop.wait(next_run - now.timestamp()) + with self._event_lock: + self.state = CronjobState.WAIT + self._event.clear() + self._event_type = CronjobEvent.TIME_SYNC + + while self._event_type == CronjobEvent.TIME_SYNC: + now = get_now() + self._event_type = CronjobEvent.NONE + cron = croniter.croniter(self.cron_expression, now) + next_run = cron.get_next() + self._event.wait(max(0, next_run - now.timestamp())) def stop(self): - self._should_stop.set() + self._event_type = CronjobEvent.STOP + self._event.set() def should_stop(self): - return self._should_stop.is_set() + return self._event_type == CronjobEvent.STOP class CronScheduler(threading.Thread): - def __init__(self, jobs): + def __init__(self, jobs, poll_seconds: float = 0.5): super().__init__() self.jobs_config = jobs self._jobs = {} + self._poll_seconds = max(1e-3, poll_seconds) self._should_stop = threading.Event() - logger.info('Cron scheduler initialized with {} jobs'. - format(len(self.jobs_config.keys()))) + logger.info( + 'Cron scheduler initialized with {} jobs'.format( + len(self.jobs_config.keys()) + ) + ) def _get_job(self, name, config): job = self._jobs.get(name) @@ -84,14 +120,21 @@ class CronScheduler(threading.Thread): return job if isinstance(config, dict): - self._jobs[name] = Cronjob(name=name, cron_expression=config['cron_expression'], - actions=config['actions']) + self._jobs[name] = Cronjob( + name=name, + cron_expression=config['cron_expression'], + actions=config['actions'], + ) elif is_functional_cron(config): - self._jobs[name] = Cronjob(name=name, cron_expression=config.cron_expression, - actions=config) + self._jobs[name] = Cronjob( + name=name, cron_expression=config.cron_expression, actions=config + ) else: - raise AssertionError('Expected type dict or function for cron {}, got {}'.format( - name, type(config))) + raise AssertionError( + 'Expected type dict or function for cron {}, got {}'.format( + name, type(config) + ) + ) return self._jobs[name] @@ -112,7 +155,22 @@ class CronScheduler(threading.Thread): if job.state == CronjobState.IDLE: job.start() - self._should_stop.wait(timeout=0.5) + t_before_wait = get_now().timestamp() + self._should_stop.wait(timeout=self._poll_seconds) + t_after_wait = get_now().timestamp() + time_drift = abs(t_after_wait - t_before_wait) - self._poll_seconds + + if not self.should_stop() and time_drift > 1: + # If the system clock has been adjusted by more than one second + # (e.g. because of DST change or NTP sync) then ensure that the + # registered cronjobs are synchronized with the new datetime + logger.info( + 'System clock drift detected: %f secs. Synchronizing the cronjobs', + time_drift, + ) + + for job in self._jobs.values(): + job.notify(CronjobEvent.TIME_SYNC) logger.info('Terminating cron scheduler') diff --git a/tests/etc/scripts/test_cron.py b/tests/etc/scripts/test_cron.py index 6618dcb3f..172dc363d 100644 --- a/tests/etc/scripts/test_cron.py +++ b/tests/etc/scripts/test_cron.py @@ -2,25 +2,34 @@ import datetime from platypush.cron import cron -from tests.test_cron import tmp_files, tmp_files_ready, \ - test_timeout, expected_cron_file_content +from tests.test_cron import test_timeout, cron_queue + + +def make_cron_expr(cron_time: datetime.datetime): + return '{min} {hour} {day} {month} * {sec}'.format( + min=cron_time.minute, + hour=cron_time.hour, + day=cron_time.day, + month=cron_time.month, + sec=cron_time.second, + ) + # Prepare a cronjob that should start test_timeout/2 seconds from the application start -cron_time = datetime.datetime.now() + datetime.timedelta(seconds=test_timeout/2) -cron_expr = '{min} {hour} {day} {month} * {sec}'.format( - min=cron_time.minute, hour=cron_time.hour, day=cron_time.day, - month=cron_time.month, sec=cron_time.second) +cron_time = datetime.datetime.now() + datetime.timedelta(seconds=test_timeout / 2) -@cron(cron_expr) +@cron(make_cron_expr(cron_time)) def cron_test(**_): - """ - Simple cronjob that awaits for ``../test_cron.py`` to be ready and writes the expected - content to the monitored temporary file. - """ - files_ready = tmp_files_ready.wait(timeout=test_timeout) - assert files_ready, \ - 'The test did not prepare the temporary files within {} seconds'.format(test_timeout) + cron_queue.put('cron_test') - with open(tmp_files[0], 'w') as f: - f.write(expected_cron_file_content) + +# Prepare another cronjob that should start 1hr + test_timeout/2 seconds from the application start +cron_time = datetime.datetime.now() + datetime.timedelta( + hours=1, seconds=test_timeout / 2 +) + + +@cron(make_cron_expr(cron_time)) +def cron_1hr_test(**_): + cron_queue.put('cron_1hr_test') diff --git a/tests/test_cron.py b/tests/test_cron.py index c7a76a873..5bc41e636 100644 --- a/tests/test_cron.py +++ b/tests/test_cron.py @@ -1,43 +1,61 @@ -import os +import datetime +import queue import pytest -import tempfile -import threading import time -tmp_files = [] -tmp_files_ready = threading.Event() +from dateutil.tz import gettz +from mock import patch + test_timeout = 10 -expected_cron_file_content = 'The cronjob ran successfully!' +cron_queue = queue.Queue() -@pytest.fixture(scope='module', autouse=True) -def tmp_file(*_): - tmp_file = tempfile.NamedTemporaryFile(prefix='platypush-test-cron-', - suffix='.txt', delete=False) - tmp_files.append(tmp_file.name) - tmp_files_ready.set() - yield tmp_file.name +class MockDatetime(datetime.datetime): + timedelta = datetime.timedelta() - for f in tmp_files: - if os.path.isfile(f): - os.unlink(f) + @classmethod + def now(cls): + return super().now(tz=gettz()) + cls.timedelta -def test_cron_execution(tmp_file): +def _test_cron_queue(expected_msg: str): + msg = None + test_start = time.time() + while time.time() - test_start <= test_timeout and msg != expected_msg: + try: + msg = cron_queue.get(block=True, timeout=test_timeout) + except queue.Empty: + break + + assert msg == expected_msg, 'The expected cronjob has not been executed' + + +def test_cron_execution(): """ Test that the cronjob in ``../etc/scripts/test_cron.py`` runs successfully. """ - actual_cron_file_content = None - test_start = time.time() + _test_cron_queue('cron_test') - while actual_cron_file_content != expected_cron_file_content and \ - time.time() - test_start < test_timeout: - with open(tmp_file, 'r') as f: - actual_cron_file_content = f.read() - time.sleep(0.5) - assert actual_cron_file_content == expected_cron_file_content, \ - 'cron_test failed to run within {} seconds'.format(test_timeout) +def test_cron_execution_upon_system_clock_change(): + """ + Test that the cronjob runs at the right time even upon DST or other + system clock changes. + """ + # Mock datetime.datetime with a class that has overridable timedelta + patcher = patch('datetime.datetime', MockDatetime) + + try: + patcher.start() + time.sleep(1) + # Simulate a +1hr shift on the system clock + MockDatetime.timedelta = datetime.timedelta(hours=1) + time.sleep(1) + finally: + patcher.stop() + + # Ensure that the cronjob that was supposed to run in an hour is now running + _test_cron_queue('cron_1hr_test') if __name__ == '__main__': From ba23eb72807625a13aa33e448ae2026e75917e3b Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 28 Apr 2022 01:04:30 +0200 Subject: [PATCH 3/3] Small LINT fix --- platypush/cron/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/platypush/cron/scheduler.py b/platypush/cron/scheduler.py index 74eaaae5e..0671cfd0d 100644 --- a/platypush/cron/scheduler.py +++ b/platypush/cron/scheduler.py @@ -17,8 +17,8 @@ def get_now() -> datetime.datetime: :return: A timezone-aware representation of `now` """ return datetime.datetime.now().replace( - tzinfo=gettz() - ) # lgtm [py/call-to-non-callable] + tzinfo=gettz() # lgtm [py/call-to-non-callable] + ) class CronjobState(enum.IntEnum):