platypush/platypush/cron/scheduler.py

86 lines
2.4 KiB
Python
Raw Normal View History

2018-01-15 22:36:24 +01:00
import datetime
import logging
import re
import time
from threading import Thread
from platypush.procedure import Procedure
2018-01-15 22:36:24 +01:00
2018-06-06 20:09:18 +02:00
logger = logging.getLogger(__name__)
2018-01-15 22:36:24 +01:00
class Cronjob(Thread):
def __init__(self, name, cron_expression, actions, *args, **kwargs):
super().__init__()
self.cron_expression = cron_expression
2018-01-15 22:44:57 +01:00
self.name = name
self.actions = Procedure.build(name=name+'__Cron', _async=False,
requests=actions)
2018-01-15 22:36:24 +01:00
def run(self):
2018-06-06 20:09:18 +02:00
logger.info('Running cronjob {}'.format(self.name))
2018-01-15 22:36:24 +01:00
response = None
context = {}
response = self.actions.execute(_async=False, **context)
logger.info('Response from cronjob {}: {}'.format(self.name, response))
2018-01-15 22:36:24 +01:00
def should_run(self):
2019-09-27 15:00:00 +02:00
units = ('minute', 'hour', 'day', 'month', 'year')
now = datetime.datetime.fromtimestamp(time.time())
2019-09-27 15:00:00 +02:00
cron_units = re.split('\s+', self.cron_expression)
2018-01-15 22:36:24 +01:00
for i in range(0, len(units)):
unit = units[i]
now_unit = getattr(now, unit)
cron_unit = cron_units[i].replace('*', str(now_unit))
m = re.match('(\d+)(/(\d+))?', cron_unit)
if m.group(3):
if int(m.group(1)) % int(m.group(3)):
return False
elif m:
if int(m.group(1)) != now_unit:
return False
else:
raise RuntimeError('Invalid cron expression for job {}: {}'.
format(self.name, self.cron_expression))
return True
class CronScheduler(Thread):
def __init__(self, jobs, *args, **kwargs):
super().__init__()
self.jobs_config = jobs
2018-06-06 20:09:18 +02:00
logger.info('Cron scheduler initialized with {} jobs'
2018-01-15 22:44:57 +01:00
.format(len(self.jobs_config.keys())))
2018-01-15 22:36:24 +01:00
@classmethod
2018-01-15 22:44:57 +01:00
def _build_job(cls, name, config):
if isinstance(config, dict):
job = Cronjob(name=name, cron_expression=config['cron_expression'],
actions=config['actions'])
2018-01-15 22:36:24 +01:00
assert isinstance(job, Cronjob)
return job
def run(self):
2018-06-06 20:09:18 +02:00
logger.info('Running cron scheduler')
2018-01-15 22:36:24 +01:00
while True:
2018-01-15 22:44:57 +01:00
for (job_name, job_config) in self.jobs_config.items():
job = self._build_job(name=job_name, config=job_config)
2018-01-15 22:36:24 +01:00
if job.should_run():
job.start()
2019-09-27 15:00:00 +02:00
time.sleep(60)
2018-01-15 22:36:24 +01:00
# vim:sw=4:ts=4:et: