Support for cron actions, solves #47

This commit is contained in:
Fabio Manganiello 2018-01-15 22:36:24 +01:00
parent f6d16366eb
commit 14b511034f
5 changed files with 112 additions and 2 deletions

View file

@ -8,6 +8,7 @@ from threading import Thread
from .bus import Bus from .bus import Bus
from .config import Config from .config import Config
from .context import register_backends from .context import register_backends
from .cron.scheduler import CronScheduler
from .event.processor import EventProcessor from .event.processor import EventProcessor
from .message.event import Event, StopEvent from .message.event import Event, StopEvent
from .message.request import Request from .message.request import Request
@ -110,6 +111,9 @@ class Daemon(object):
for backend in self.backends.values(): for backend in self.backends.values():
backend.start() backend.start()
# Start the cron scheduler
CronScheduler(jobs=Config.get_cronjobs()).start()
# Poll for messages on the bus # Poll for messages on the bus
try: try:
self.bus.poll() self.bus.poll()

View file

@ -34,8 +34,8 @@ class Config(object):
] ]
_default_constants = { _default_constants = {
'today': datetime.date.today().isoformat, 'today': datetime.date.today,
'now': datetime.datetime.now().isoformat, 'now': datetime.datetime.now,
} }
_workdir_location = os.path.join(os.environ['HOME'], '.local', 'share', 'platypush') _workdir_location = os.path.join(os.environ['HOME'], '.local', 'share', 'platypush')
@ -76,8 +76,10 @@ class Config(object):
self.event_hooks = {} self.event_hooks = {}
self.procedures = {} self.procedures = {}
self.constants = {} self.constants = {}
self.cronjobs = []
self._init_constants() self._init_constants()
self._init_cronjobs()
self._init_components() self._init_components()
@ -133,6 +135,12 @@ class Config(object):
self.constants[key] = value self.constants[key] = value
def _init_cronjobs(self):
if 'cron' in self._config:
for job in self._config['cron']['jobs']:
self.cronjobs.append(job)
@staticmethod @staticmethod
def get_backends(): def get_backends():
global _default_config_instance global _default_config_instance
@ -176,6 +184,12 @@ class Config(object):
value = _default_config_instance.constants[name] value = _default_config_instance.constants[name]
return value() if callable(value) else value return value() if callable(value) else value
@staticmethod
def get_cronjobs():
global _default_config_instance
if _default_config_instance is None: _default_config_instance = Config()
return _default_config_instance.cronjobs
@staticmethod @staticmethod
def get_default_pusher_backend(): def get_default_pusher_backend():
""" """

View file

View file

@ -0,0 +1,86 @@
import datetime
import logging
import re
import time
from threading import Thread
from platypush.event.hook import EventAction
class Cronjob(Thread):
def __init__(self, name, cron_expression, actions, *args, **kwargs):
super().__init__()
self.cron_expression = cron_expression
self.actions = []
for action in actions:
self.actions.append(EventAction.build(action))
def run(self):
logging.info('Running cronjob {}'.format(self.name))
response = None
context = {}
for action in self.actions:
response = action.execute(async=False, **context)
logging.info('Response from cronjob {}: {}'.format(self.name, response))
def should_run(self):
units = ('minute', 'hour', 'day', 'month', 'year')
now = datetime.datetime.fromtimestamp(time.time())
cron_units = re.split('\s+', self.cron_expression)
for i in range(0, len(units)):
unit = units[i]
now_unit = getattr(now, unit)
cron_unit = cron_units[i].replace('*', str(now_unit))
m = re.match('(\d+)(/(\d+))?', cron_unit)
if m.group(3):
if int(m.group(1)) % int(m.group(3)):
return False
elif m:
if int(m.group(1)) != now_unit:
return False
else:
raise RuntimeError('Invalid cron expression for job {}: {}'.
format(self.name, self.cron_expression))
return True
class CronScheduler(Thread):
def __init__(self, jobs, *args, **kwargs):
super().__init__()
self.jobs_config = jobs
logging.info('Cron scheduler initialized with {} jobs'
.format(len(self.jobs_config)))
@classmethod
def _build_job(cls, job_config):
if isinstance(job_config, dict):
job = Cronjob(cron_expression=job_config['cron_expression'],
name=job_config['name'],
actions=job_config['actions'])
assert isinstance(job, Cronjob)
return job
def run(self):
logging.info('Running cron scheduler')
while True:
for job_config in self.jobs_config:
job = self._build_job(job_config)
if job.should_run():
job.start()
time.sleep(60)
# vim:sw=4:ts=4:et:

View file

@ -1,4 +1,5 @@
import copy import copy
import datetime
import json import json
import logging import logging
import random import random
@ -114,6 +115,11 @@ class Request(Message):
try: try:
context_value = eval("context['{}']{}".format( context_value = eval("context['{}']{}".format(
context_argname, path if path else '')) context_argname, path if path else ''))
if callable(context_value):
context_value = context_value()
if isinstance(context_value, datetime.date):
context_value = context_value.isoformat()
except: context_value = expr except: context_value = expr
parsed_value += prefix + ( parsed_value += prefix + (