Smarter cron management with croniter

This commit is contained in:
Fabio Manganiello 2019-09-28 01:34:27 +02:00
parent e9eda49c91
commit faa55daccf
4 changed files with 55 additions and 46 deletions

View file

@ -225,6 +225,7 @@ autodoc_mock_imports = ['googlesamples.assistant.grpc.audio_helpers',
'picamera', 'picamera',
'pwm3901', 'pwm3901',
'PIL', 'PIL',
'croniter',
] ]
sys.path.insert(0, os.path.abspath('../..')) sys.path.insert(0, os.path.abspath('../..'))

View file

@ -1,8 +1,9 @@
import datetime import enum
import logging import logging
import re
import time import time
import croniter
from threading import Thread from threading import Thread
from platypush.procedure import Procedure from platypush.procedure import Procedure
@ -10,76 +11,79 @@ from platypush.procedure import Procedure
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class CronjobState(enum.IntEnum):
IDLE = 0
WAIT = 1
RUNNING = 2
DONE = 3
ERROR = 4
class Cronjob(Thread): class Cronjob(Thread):
def __init__(self, name, cron_expression, actions, *args, **kwargs): def __init__(self, name, cron_expression, actions):
super().__init__() super().__init__()
self.cron_expression = cron_expression self.cron_expression = cron_expression
self.name = name self.name = name
self.state = CronjobState.IDLE
self.actions = Procedure.build(name=name + '__Cron', _async=False, self.actions = Procedure.build(name=name + '__Cron', _async=False,
requests=actions) requests=actions)
def run(self): def run(self):
self.state = CronjobState.WAIT
self.wait()
self.state = CronjobState.RUNNING
try:
logger.info('Running cronjob {}'.format(self.name)) logger.info('Running cronjob {}'.format(self.name))
response = None
context = {} context = {}
response = self.actions.execute(_async=False, **context) response = self.actions.execute(_async=False, **context)
logger.info('Response from cronjob {}: {}'.format(self.name, response)) logger.info('Response from cronjob {}: {}'.format(self.name, response))
self.state = CronjobState.DONE
except Exception as e:
logger.exception(e)
self.state = CronjobState.ERROR
def wait(self):
now = int(time.time())
cron = croniter.croniter(self.cron_expression, now)
next_run = int(cron.get_next())
time.sleep(next_run - now)
def should_run(self): def should_run(self):
units = ('minute', 'hour', 'day', 'month', 'year') now = int(time.time())
now = datetime.datetime.fromtimestamp(time.time()) cron = croniter.croniter(self.cron_expression, now)
cron_units = re.split('\s+', self.cron_expression) next_run = int(cron.get_next())
return now == next_run
for i in range(0, len(units)):
unit = units[i]
now_unit = getattr(now, unit)
cron_unit = cron_units[i].replace('*', str(now_unit))
m = re.match('(\d+)(/(\d+))?', cron_unit)
if m.group(3):
if int(m.group(1)) % int(m.group(3)):
return False
elif m:
if int(m.group(1)) != now_unit:
return False
else:
raise RuntimeError('Invalid cron expression for job {}: {}'.
format(self.name, self.cron_expression))
return True
class CronScheduler(Thread): class CronScheduler(Thread):
def __init__(self, jobs, *args, **kwargs): def __init__(self, jobs):
super().__init__() super().__init__()
self.jobs_config = jobs self.jobs_config = jobs
logger.info('Cron scheduler initialized with {} jobs' self._jobs = {}
.format(len(self.jobs_config.keys()))) logger.info('Cron scheduler initialized with {} jobs'.
format(len(self.jobs_config.keys())))
def _get_job(self, name, config):
@classmethod job = self._jobs.get(name)
def _build_job(cls, name, config): if job and job.state not in [CronjobState.DONE, CronjobState.ERROR]:
if isinstance(config, dict):
job = Cronjob(name=name, cron_expression=config['cron_expression'],
actions=config['actions'])
assert isinstance(job, Cronjob)
return job return job
self._jobs[name] = Cronjob(name=name, cron_expression=config['cron_expression'],
actions=config['actions'])
return self._jobs[name]
def run(self): def run(self):
logger.info('Running cron scheduler') logger.info('Running cron scheduler')
while True: while True:
for (job_name, job_config) in self.jobs_config.items(): for (job_name, job_config) in self.jobs_config.items():
job = self._build_job(name=job_name, config=job_config) job = self._get_job(name=job_name, config=job_config)
if job.should_run(): if job.state == CronjobState.IDLE:
job.start() job.start()
time.sleep(60) time.sleep(0.5)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -172,3 +172,6 @@ pyScss
# Support for machine learning CV plugin # Support for machine learning CV plugin
# cv2 # cv2
# numpy # numpy
# Support for cronjobs
croniter

View file

@ -133,6 +133,7 @@ setup(
'pyyaml', 'pyyaml',
'redis', 'redis',
'requests', 'requests',
'croniter',
], ],
extras_require = { extras_require = {
'Support for custom thread and process names': ['python-prctl'], 'Support for custom thread and process names': ['python-prctl'],