Merge branch 'master' into 29-generic-entities-support

This commit is contained in:
Fabio Manganiello 2022-12-20 23:06:19 +01:00
commit 0513339be7
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
5 changed files with 73 additions and 13 deletions

View file

@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
Given the high speed of development in the first phase, changes are being Given the high speed of development in the first phase, changes are being
reported only starting from v0.20.2. reported only starting from v0.20.2.
## [0.24.4] - 2022-12-20
### Fixed
- Fixed cronjobs potentially being triggered even if it wasn't their slot in
case of clock synchronization events.
## [0.24.3] - 2022-12-17 ## [0.24.3] - 2022-12-17
### Added ### Added

View file

@ -25,7 +25,7 @@ from .message.response import Response
from .utils import set_thread_name, get_enabled_plugins from .utils import set_thread_name, get_enabled_plugins
__author__ = 'Fabio Manganiello <info@fabiomanganiello.com>' __author__ = 'Fabio Manganiello <info@fabiomanganiello.com>'
__version__ = '0.24.3' __version__ = '0.24.4'
log = logging.getLogger('platypush') log = logging.getLogger('platypush')

View file

@ -2,6 +2,7 @@ import datetime
import enum import enum
import logging import logging
import threading import threading
from typing import Dict
import croniter import croniter
from dateutil.tz import gettz from dateutil.tz import gettz
@ -22,6 +23,10 @@ def get_now() -> datetime.datetime:
class CronjobState(enum.IntEnum): class CronjobState(enum.IntEnum):
"""
An enum used to model the possible states of a cronjob.
"""
IDLE = 0 IDLE = 0
WAIT = 1 WAIT = 1
RUNNING = 2 RUNNING = 2
@ -30,12 +35,22 @@ class CronjobState(enum.IntEnum):
class CronjobEvent(enum.IntEnum): class CronjobEvent(enum.IntEnum):
"""
A list of events used to synchronize with a cronjob.
"""
NONE = 0 NONE = 0
STOP = 1 STOP = 1
TIME_SYNC = 2 TIME_SYNC = 2
class Cronjob(threading.Thread): class Cronjob(threading.Thread):
"""
Representation of a cronjob. The inner logic is wrapped by a thread that
waits until the next execution slot, and can quickly synchronize in case of
clock change/drift.
"""
def __init__(self, name, cron_expression, actions): def __init__(self, name, cron_expression, actions):
super().__init__() super().__init__()
self.cron_expression = cron_expression self.cron_expression = cron_expression
@ -53,13 +68,23 @@ class Cronjob(threading.Thread):
self.actions = actions self.actions = actions
def notify(self, event: CronjobEvent): def notify(self, event: CronjobEvent):
"""
Send an event to this cronjob.
"""
with self._event_lock: with self._event_lock:
self._event_type = event self._event_type = event
self._event.set() self._event.set()
def run(self): def run(self):
"""
Inner logic of the cronjob thread.
"""
set_thread_name(f'cron:{self.name}') set_thread_name(f'cron:{self.name}')
# Wait until an event is received or the next execution slot is reached
self.wait() self.wait()
# Early exit if we received a stop event
if self.should_stop(): if self.should_stop():
return return
@ -70,8 +95,10 @@ class Cronjob(threading.Thread):
context = {} context = {}
if isinstance(self.actions, Procedure): if isinstance(self.actions, Procedure):
# If the cronjob wraps a procedure, execute it
response = self.actions.execute(_async=False, **context) response = self.actions.execute(_async=False, **context)
else: else:
# Otherwise, execute the scheduled actions one by one
response = self.actions(**context) response = self.actions(**context)
logger.info('Response from cronjob {}: {}'.format(self.name, response)) logger.info('Response from cronjob {}: {}'.format(self.name, response))
@ -81,31 +108,48 @@ class Cronjob(threading.Thread):
self.state = CronjobState.ERROR self.state = CronjobState.ERROR
def wait(self): def wait(self):
"""
Wait until the next slot is reached.
"""
# Set the cronjob in WAIT state
with self._event_lock: with self._event_lock:
self.state = CronjobState.WAIT self.state = CronjobState.WAIT
self._event.clear() self._event.clear()
self._event_type = CronjobEvent.TIME_SYNC self._event_type = CronjobEvent.TIME_SYNC
# Keep iterating until it's our time to run. If we receive clock
# synchronization events, the cronjob updates its next expected run and
# keeps waiting.
while self._event_type == CronjobEvent.TIME_SYNC: while self._event_type == CronjobEvent.TIME_SYNC:
now = get_now()
self._event_type = CronjobEvent.NONE self._event_type = CronjobEvent.NONE
cron = croniter.croniter(self.cron_expression, now) next_run = self._get_next_run_secs()
next_run = cron.get_next(datetime.datetime) self._event.wait(next_run)
self._event.wait(max(0, (next_run - now).total_seconds()))
def stop(self): with self._event_lock:
self._event_type = CronjobEvent.STOP self._event.clear()
self._event.set()
def _get_next_run_secs(self) -> int:
"""
Get the number of seconds between now and the next scheduled run.
"""
now = get_now()
cron = croniter.croniter(self.cron_expression, now)
next_run = cron.get_next(datetime.datetime)
return max(0, (next_run - now).total_seconds())
def should_stop(self): def should_stop(self):
return self._event_type == CronjobEvent.STOP return self._event_type == CronjobEvent.STOP
class CronScheduler(threading.Thread): class CronScheduler(threading.Thread):
"""
Main cron scheduler job.
"""
def __init__(self, jobs, poll_seconds: float = 0.5): def __init__(self, jobs, poll_seconds: float = 0.5):
super().__init__() super().__init__()
self.jobs_config = jobs self.jobs_config = jobs
self._jobs = {} self._jobs: Dict[str, Cronjob] = {}
self._poll_seconds = max(1e-3, poll_seconds) self._poll_seconds = max(1e-3, poll_seconds)
self._should_stop = threading.Event() self._should_stop = threading.Event()
logger.info( logger.info(
@ -114,18 +158,24 @@ class CronScheduler(threading.Thread):
) )
) )
def _get_job(self, name, config): def _get_job(self, name, config) -> Cronjob:
"""
Get a cronjob by name.
"""
# Check if the cronjob has already been indexed.
job = self._jobs.get(name) job = self._jobs.get(name)
if job and job.state not in [CronjobState.DONE, CronjobState.ERROR]: if job and job.state not in [CronjobState.DONE, CronjobState.ERROR]:
return job return job
if isinstance(config, dict): if isinstance(config, dict):
# If the cronjob is a static list of actions, initialize it from dict
self._jobs[name] = Cronjob( self._jobs[name] = Cronjob(
name=name, name=name,
cron_expression=config['cron_expression'], cron_expression=config['cron_expression'],
actions=config['actions'], actions=config['actions'],
) )
elif is_functional_cron(config): elif is_functional_cron(config):
# Otherwise, initialize it as a native Python function
self._jobs[name] = Cronjob( self._jobs[name] = Cronjob(
name=name, cron_expression=config.cron_expression, actions=config name=name, cron_expression=config.cron_expression, actions=config
) )
@ -139,8 +189,11 @@ class CronScheduler(threading.Thread):
return self._jobs[name] return self._jobs[name]
def stop(self): def stop(self):
"""
Stop the scheduler and send a STOP signal to all the registered cronjobs.
"""
for job in self._jobs.values(): for job in self._jobs.values():
job.stop() job.notify(CronjobEvent.STOP)
self._should_stop.set() self._should_stop.set()
def should_stop(self): def should_stop(self):

View file

@ -1,5 +1,5 @@
[bumpversion] [bumpversion]
current_version = 0.24.3 current_version = 0.24.4
commit = True commit = True
tag = True tag = True

View file

@ -28,7 +28,7 @@ backend = pkg_files('platypush/backend')
setup( setup(
name="platypush", name="platypush",
version="0.24.3", version="0.24.4",
author="Fabio Manganiello", author="Fabio Manganiello",
author_email="info@fabiomanganiello.com", author_email="info@fabiomanganiello.com",
description="Platypush service", description="Platypush service",