Merge branch '217-cron-decorated-function-are-off-after-dst-change' into 'master'

Resolve "@cron decorated function are off after DST change"

Closes #217

See merge request platypush/platypush!13
This commit is contained in:
Fabio Manganiello 2022-04-28 01:14:18 +02:00
commit e9454b0c0f
5 changed files with 166 additions and 74 deletions

View file

@ -26,14 +26,14 @@ main_bus = None
def register_backends(bus=None, global_scope=False, **kwargs): def register_backends(bus=None, global_scope=False, **kwargs):
""" Initialize the backend objects based on the configuration and returns """Initialize the backend objects based on the configuration and returns
a name -> backend_instance map. a name -> backend_instance map.
Params: Params:
bus -- If specific (it usually should), the messages processed by the bus -- If specific (it usually should), the messages processed by the
backends will be posted on this bus. backends will be posted on this bus.
kwargs -- Any additional key-value parameters required to initialize the backends kwargs -- Any additional key-value parameters required to initialize the backends
""" """
global main_bus global main_bus
if bus: if bus:
@ -57,8 +57,7 @@ def register_backends(bus=None, global_scope=False, **kwargs):
b = getattr(module, cls_name)(bus=bus, **cfg, **kwargs) b = getattr(module, cls_name)(bus=bus, **cfg, **kwargs)
backends[name] = b backends[name] = b
except AttributeError as e: except AttributeError as e:
logger.warning('No such class in {}: {}'.format( logger.warning('No such class in {}: {}'.format(module.__name__, cls_name))
module.__name__, cls_name))
raise RuntimeError(e) raise RuntimeError(e)
return backends return backends
@ -74,15 +73,15 @@ def register_plugins(bus=None):
def get_backend(name): def get_backend(name):
""" Returns the backend instance identified by name if it exists """ """Returns the backend instance identified by name if it exists"""
global backends global backends
return backends.get(name) return backends.get(name)
def get_plugin(plugin_name, reload=False): def get_plugin(plugin_name, reload=False):
""" Registers a plugin instance by name if not registered already, or """Registers a plugin instance by name if not registered already, or
returns the registered plugin instance""" returns the registered plugin instance"""
global plugins global plugins
global plugins_init_locks global plugins_init_locks
@ -104,8 +103,9 @@ def get_plugin(plugin_name, reload=False):
cls_name += token.title() cls_name += token.title()
cls_name += 'Plugin' cls_name += 'Plugin'
plugin_conf = Config.get_plugins()[plugin_name] \ plugin_conf = (
if plugin_name in Config.get_plugins() else {} Config.get_plugins()[plugin_name] if plugin_name in Config.get_plugins() else {}
)
if 'disabled' in plugin_conf: if 'disabled' in plugin_conf:
if plugin_conf['disabled'] is True: if plugin_conf['disabled'] is True:
@ -120,7 +120,9 @@ def get_plugin(plugin_name, reload=False):
try: try:
plugin_class = getattr(plugin, cls_name) plugin_class = getattr(plugin, cls_name)
except AttributeError as e: except AttributeError as e:
logger.warning('No such class in {}: {} [error: {}]'.format(plugin_name, cls_name, str(e))) logger.warning(
'No such class in {}: {} [error: {}]'.format(plugin_name, cls_name, str(e))
)
raise RuntimeError(e) raise RuntimeError(e)
with plugins_init_locks[plugin_name]: with plugins_init_locks[plugin_name]:
@ -137,13 +139,14 @@ def get_bus() -> Bus:
return main_bus return main_bus
from platypush.bus.redis import RedisBus from platypush.bus.redis import RedisBus
return RedisBus() return RedisBus()
def get_or_create_event_loop(): def get_or_create_event_loop():
try: try:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
except RuntimeError: except (DeprecationWarning, RuntimeError):
loop = asyncio.new_event_loop() loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)

View file

@ -7,11 +7,20 @@ import croniter
from dateutil.tz import gettz from dateutil.tz import gettz
from platypush.procedure import Procedure from platypush.procedure import Procedure
from platypush.utils import is_functional_cron from platypush.utils import is_functional_cron, set_thread_name
logger = logging.getLogger('platypush:cron') logger = logging.getLogger('platypush:cron')
def get_now() -> datetime.datetime:
"""
:return: A timezone-aware representation of `now`
"""
return datetime.datetime.now().replace(
tzinfo=gettz() # lgtm [py/call-to-non-callable]
)
class CronjobState(enum.IntEnum): class CronjobState(enum.IntEnum):
IDLE = 0 IDLE = 0
WAIT = 1 WAIT = 1
@ -20,21 +29,36 @@ class CronjobState(enum.IntEnum):
ERROR = 4 ERROR = 4
class CronjobEvent(enum.IntEnum):
NONE = 0
STOP = 1
TIME_SYNC = 2
class Cronjob(threading.Thread): class Cronjob(threading.Thread):
def __init__(self, name, cron_expression, actions): def __init__(self, name, cron_expression, actions):
super().__init__() super().__init__()
self.cron_expression = cron_expression self.cron_expression = cron_expression
self.name = name self.name = name
self.state = CronjobState.IDLE self.state = CronjobState.IDLE
self._should_stop = threading.Event() self._event = threading.Event()
self._event_type = CronjobEvent.NONE
self._event_lock = threading.RLock()
if isinstance(actions, dict) or isinstance(actions, list): if isinstance(actions, (list, dict)):
self.actions = Procedure.build(name=name + '__Cron', _async=False, requests=actions) self.actions = Procedure.build(
name=name + '__Cron', _async=False, requests=actions
)
else: else:
self.actions = actions self.actions = actions
def notify(self, event: CronjobEvent):
with self._event_lock:
self._event_type = event
self._event.set()
def run(self): def run(self):
self.state = CronjobState.WAIT set_thread_name(f'cron:{self.name}')
self.wait() self.wait()
if self.should_stop(): if self.should_stop():
return return
@ -57,26 +81,38 @@ class Cronjob(threading.Thread):
self.state = CronjobState.ERROR self.state = CronjobState.ERROR
def wait(self): def wait(self):
now = datetime.datetime.now().replace(tzinfo=gettz()) # lgtm [py/call-to-non-callable] with self._event_lock:
cron = croniter.croniter(self.cron_expression, now) self.state = CronjobState.WAIT
next_run = cron.get_next() self._event.clear()
self._should_stop.wait(next_run - now.timestamp()) self._event_type = CronjobEvent.TIME_SYNC
while self._event_type == CronjobEvent.TIME_SYNC:
now = get_now()
self._event_type = CronjobEvent.NONE
cron = croniter.croniter(self.cron_expression, now)
next_run = cron.get_next()
self._event.wait(max(0, next_run - now.timestamp()))
def stop(self): def stop(self):
self._should_stop.set() self._event_type = CronjobEvent.STOP
self._event.set()
def should_stop(self): def should_stop(self):
return self._should_stop.is_set() return self._event_type == CronjobEvent.STOP
class CronScheduler(threading.Thread): class CronScheduler(threading.Thread):
def __init__(self, jobs): def __init__(self, jobs, poll_seconds: float = 0.5):
super().__init__() super().__init__()
self.jobs_config = jobs self.jobs_config = jobs
self._jobs = {} self._jobs = {}
self._poll_seconds = max(1e-3, poll_seconds)
self._should_stop = threading.Event() self._should_stop = threading.Event()
logger.info('Cron scheduler initialized with {} jobs'. logger.info(
format(len(self.jobs_config.keys()))) 'Cron scheduler initialized with {} jobs'.format(
len(self.jobs_config.keys())
)
)
def _get_job(self, name, config): def _get_job(self, name, config):
job = self._jobs.get(name) job = self._jobs.get(name)
@ -84,14 +120,21 @@ class CronScheduler(threading.Thread):
return job return job
if isinstance(config, dict): if isinstance(config, dict):
self._jobs[name] = Cronjob(name=name, cron_expression=config['cron_expression'], self._jobs[name] = Cronjob(
actions=config['actions']) name=name,
cron_expression=config['cron_expression'],
actions=config['actions'],
)
elif is_functional_cron(config): elif is_functional_cron(config):
self._jobs[name] = Cronjob(name=name, cron_expression=config.cron_expression, self._jobs[name] = Cronjob(
actions=config) name=name, cron_expression=config.cron_expression, actions=config
)
else: else:
raise AssertionError('Expected type dict or function for cron {}, got {}'.format( raise AssertionError(
name, type(config))) 'Expected type dict or function for cron {}, got {}'.format(
name, type(config)
)
)
return self._jobs[name] return self._jobs[name]
@ -112,7 +155,22 @@ class CronScheduler(threading.Thread):
if job.state == CronjobState.IDLE: if job.state == CronjobState.IDLE:
job.start() job.start()
self._should_stop.wait(timeout=0.5) t_before_wait = get_now().timestamp()
self._should_stop.wait(timeout=self._poll_seconds)
t_after_wait = get_now().timestamp()
time_drift = abs(t_after_wait - t_before_wait) - self._poll_seconds
if not self.should_stop() and time_drift > 1:
# If the system clock has been adjusted by more than one second
# (e.g. because of DST change or NTP sync) then ensure that the
# registered cronjobs are synchronized with the new datetime
logger.info(
'System clock drift detected: %f secs. Synchronizing the cronjobs',
time_drift,
)
for job in self._jobs.values():
job.notify(CronjobEvent.TIME_SYNC)
logger.info('Terminating cron scheduler') logger.info('Terminating cron scheduler')

View file

@ -2,3 +2,7 @@
skip-string-normalization = true skip-string-normalization = true
skip-numeric-underscore-normalization = true skip-numeric-underscore-normalization = true
[tool.pytest.ini_options]
filterwarnings = [
'ignore:There is no current event loop:DeprecationWarning',
]

View file

@ -2,25 +2,34 @@ import datetime
from platypush.cron import cron from platypush.cron import cron
from tests.test_cron import tmp_files, tmp_files_ready, \ from tests.test_cron import test_timeout, cron_queue
test_timeout, expected_cron_file_content
def make_cron_expr(cron_time: datetime.datetime):
return '{min} {hour} {day} {month} * {sec}'.format(
min=cron_time.minute,
hour=cron_time.hour,
day=cron_time.day,
month=cron_time.month,
sec=cron_time.second,
)
# Prepare a cronjob that should start test_timeout/2 seconds from the application start # Prepare a cronjob that should start test_timeout/2 seconds from the application start
cron_time = datetime.datetime.now() + datetime.timedelta(seconds=test_timeout/2) cron_time = datetime.datetime.now() + datetime.timedelta(seconds=test_timeout / 2)
cron_expr = '{min} {hour} {day} {month} * {sec}'.format(
min=cron_time.minute, hour=cron_time.hour, day=cron_time.day,
month=cron_time.month, sec=cron_time.second)
@cron(cron_expr) @cron(make_cron_expr(cron_time))
def cron_test(**_): def cron_test(**_):
""" cron_queue.put('cron_test')
Simple cronjob that awaits for ``../test_cron.py`` to be ready and writes the expected
content to the monitored temporary file.
"""
files_ready = tmp_files_ready.wait(timeout=test_timeout)
assert files_ready, \
'The test did not prepare the temporary files within {} seconds'.format(test_timeout)
with open(tmp_files[0], 'w') as f:
f.write(expected_cron_file_content) # Prepare another cronjob that should start 1hr + test_timeout/2 seconds from the application start
cron_time = datetime.datetime.now() + datetime.timedelta(
hours=1, seconds=test_timeout / 2
)
@cron(make_cron_expr(cron_time))
def cron_1hr_test(**_):
cron_queue.put('cron_1hr_test')

View file

@ -1,43 +1,61 @@
import os import datetime
import queue
import pytest import pytest
import tempfile
import threading
import time import time
tmp_files = [] from dateutil.tz import gettz
tmp_files_ready = threading.Event() from mock import patch
test_timeout = 10 test_timeout = 10
expected_cron_file_content = 'The cronjob ran successfully!' cron_queue = queue.Queue()
@pytest.fixture(scope='module', autouse=True) class MockDatetime(datetime.datetime):
def tmp_file(*_): timedelta = datetime.timedelta()
tmp_file = tempfile.NamedTemporaryFile(prefix='platypush-test-cron-',
suffix='.txt', delete=False)
tmp_files.append(tmp_file.name)
tmp_files_ready.set()
yield tmp_file.name
for f in tmp_files: @classmethod
if os.path.isfile(f): def now(cls):
os.unlink(f) return super().now(tz=gettz()) + cls.timedelta
def test_cron_execution(tmp_file): def _test_cron_queue(expected_msg: str):
msg = None
test_start = time.time()
while time.time() - test_start <= test_timeout and msg != expected_msg:
try:
msg = cron_queue.get(block=True, timeout=test_timeout)
except queue.Empty:
break
assert msg == expected_msg, 'The expected cronjob has not been executed'
def test_cron_execution():
""" """
Test that the cronjob in ``../etc/scripts/test_cron.py`` runs successfully. Test that the cronjob in ``../etc/scripts/test_cron.py`` runs successfully.
""" """
actual_cron_file_content = None _test_cron_queue('cron_test')
test_start = time.time()
while actual_cron_file_content != expected_cron_file_content and \
time.time() - test_start < test_timeout:
with open(tmp_file, 'r') as f:
actual_cron_file_content = f.read()
time.sleep(0.5)
assert actual_cron_file_content == expected_cron_file_content, \ def test_cron_execution_upon_system_clock_change():
'cron_test failed to run within {} seconds'.format(test_timeout) """
Test that the cronjob runs at the right time even upon DST or other
system clock changes.
"""
# Mock datetime.datetime with a class that has overridable timedelta
patcher = patch('datetime.datetime', MockDatetime)
try:
patcher.start()
time.sleep(1)
# Simulate a +1hr shift on the system clock
MockDatetime.timedelta = datetime.timedelta(hours=1)
time.sleep(1)
finally:
patcher.stop()
# Ensure that the cronjob that was supposed to run in an hour is now running
_test_cron_queue('cron_1hr_test')
if __name__ == '__main__': if __name__ == '__main__':