Make sure that all hanging threads, backends and services are stopped and their resources cleaned up when the application stops.

This commit is contained in:
Fabio Manganiello 2021-02-23 23:07:35 +01:00
parent d1b7b1768c
commit 2800bac3fb
20 changed files with 181 additions and 109 deletions

View file

@ -16,7 +16,7 @@ from .context import register_backends
from .cron.scheduler import CronScheduler from .cron.scheduler import CronScheduler
from .event.processor import EventProcessor from .event.processor import EventProcessor
from .logger import Logger from .logger import Logger
from .message.event import Event, StopEvent from .message.event import Event
from .message.event.application import ApplicationStartedEvent, ApplicationStoppedEvent from .message.event.application import ApplicationStartedEvent, ApplicationStoppedEvent
from .message.request import Request from .message.request import Request
from .message.response import Response from .message.response import Response
@ -80,6 +80,7 @@ class Daemon:
self.event_processor = EventProcessor() self.event_processor = EventProcessor()
self.requests_to_process = requests_to_process self.requests_to_process = requests_to_process
self.processed_requests = 0 self.processed_requests = 0
self.cron_scheduler = None
@classmethod @classmethod
def build_from_cmdline(cls, args): def build_from_cmdline(cls, args):
@ -135,9 +136,6 @@ class Daemon:
self.stop_app() self.stop_app()
elif isinstance(msg, Response): elif isinstance(msg, Response):
logger.info('Received response: {}'.format(msg)) logger.info('Received response: {}'.format(msg))
elif isinstance(msg, StopEvent) and msg.targets_me():
logger.info('Received STOP event: {}'.format(msg))
self.stop_app()
elif isinstance(msg, Event): elif isinstance(msg, Event):
if not msg.disable_logging: if not msg.disable_logging:
logger.info('Received event: {}'.format(msg)) logger.info('Received event: {}'.format(msg))
@ -147,9 +145,14 @@ class Daemon:
def stop_app(self): def stop_app(self):
""" Stops the backends and the bus """ """ Stops the backends and the bus """
self.bus.post(ApplicationStoppedEvent())
for backend in self.backends.values(): for backend in self.backends.values():
backend.stop() backend.stop()
self.bus.stop() self.bus.stop()
if self.cron_scheduler:
self.cron_scheduler.stop()
def start(self): def start(self):
""" Start the daemon """ """ Start the daemon """
@ -175,7 +178,8 @@ class Daemon:
# Start the cron scheduler # Start the cron scheduler
if Config.get_cronjobs(): if Config.get_cronjobs():
CronScheduler(jobs=Config.get_cronjobs()).start() self.cron_scheduler = CronScheduler(jobs=Config.get_cronjobs())
self.cron_scheduler.start()
self.bus.post(ApplicationStartedEvent()) self.bus.post(ApplicationStartedEvent())
@ -185,9 +189,10 @@ class Daemon:
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info('SIGINT received, terminating application') logger.info('SIGINT received, terminating application')
finally: finally:
self.bus.post(ApplicationStoppedEvent())
self.stop_app() self.stop_app()
sys.exit(0)
def main(): def main():
""" """

View file

@ -22,7 +22,7 @@ from platypush.utils import set_timeout, clear_timeout, \
from platypush import __version__ from platypush import __version__
from platypush.event import EventGenerator from platypush.event import EventGenerator
from platypush.message import Message from platypush.message import Message
from platypush.message.event import Event, StopEvent from platypush.message.event import Event
from platypush.message.request import Request from platypush.message.request import Request
from platypush.message.response import Response from platypush.message.response import Response
@ -62,7 +62,6 @@ class Backend(Thread, EventGenerator):
self.poll_seconds = float(poll_seconds) if poll_seconds else None self.poll_seconds = float(poll_seconds) if poll_seconds else None
self.device_id = Config.get('device_id') self.device_id = Config.get('device_id')
self.thread_id = None self.thread_id = None
self._should_stop = False
self._stop_event = threading.Event() self._stop_event = threading.Event()
self._kwargs = kwargs self._kwargs = kwargs
self.logger = logging.getLogger('platypush:backend:' + get_backend_name_by_class(self.__class__)) self.logger = logging.getLogger('platypush:backend:' + get_backend_name_by_class(self.__class__))
@ -103,10 +102,6 @@ class Backend(Thread, EventGenerator):
self.stop() self.stop()
return return
if isinstance(msg, StopEvent) and msg.targets_me():
self.logger.info('Received STOP event on {}'.format(self.__class__.__name__))
self._should_stop = True
else:
msg.backend = self # Augment message to be able to process responses msg.backend = self # Augment message to be able to process responses
self.bus.post(msg) self.bus.post(msg)
@ -263,22 +258,19 @@ class Backend(Thread, EventGenerator):
def on_stop(self): def on_stop(self):
""" Callback invoked when the process stops """ """ Callback invoked when the process stops """
self.unregister_service() pass
def stop(self): def stop(self):
""" Stops the backend thread by sending a STOP event on its bus """ """ Stops the backend thread by sending a STOP event on its bus """
def _async_stop(): def _async_stop():
evt = StopEvent(target=self.device_id, origin=self.device_id,
thread_id=self.thread_id)
self.send_message(evt)
self._stop_event.set() self._stop_event.set()
self.unregister_service()
self.on_stop() self.on_stop()
Thread(target=_async_stop).start() Thread(target=_async_stop).start()
def should_stop(self): def should_stop(self):
return self._should_stop return self._stop_event.is_set()
def wait_stop(self, timeout=None) -> bool: def wait_stop(self, timeout=None) -> bool:
return self._stop_event.wait(timeout) return self._stop_event.wait(timeout)

View file

@ -122,7 +122,7 @@ class CameraPiBackend(Backend):
while True: while True:
self.camera.wait_recording(2) self.camera.wait_recording(2)
else: else:
while True: while not self.should_stop():
connection = self.server_socket.accept()[0].makefile('wb') connection = self.server_socket.accept()[0].makefile('wb')
self.logger.info('Accepted client connection on port {}'.format(self.listen_port)) self.logger.info('Accepted client connection on port {}'.format(self.listen_port))

View file

@ -26,6 +26,7 @@ class ClipboardBackend(Backend):
self._last_text: Optional[str] = None self._last_text: Optional[str] = None
def run(self): def run(self):
self.logger.info('Started clipboard monitor backend')
while not self.should_stop(): while not self.should_stop():
text = pyperclip.paste() text = pyperclip.paste()
if text and text != self._last_text: if text and text != self._last_text:
@ -34,5 +35,6 @@ class ClipboardBackend(Backend):
self._last_text = text self._last_text = text
time.sleep(0.1) time.sleep(0.1)
self.logger.info('Stopped clipboard monitor backend')
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -165,7 +165,7 @@ class GithubBackend(Backend):
def _events_monitor(self, uri: str, method: str = 'get'): def _events_monitor(self, uri: str, method: str = 'get'):
def thread(): def thread():
while True: while not self.should_stop():
try: try:
events = self._request(uri, method) events = self._request(uri, method)
if not events: if not events:

View file

@ -171,6 +171,7 @@ class HttpBackend(Backend):
def __init__(self, port=_DEFAULT_HTTP_PORT, def __init__(self, port=_DEFAULT_HTTP_PORT,
websocket_port=_DEFAULT_WEBSOCKET_PORT, websocket_port=_DEFAULT_WEBSOCKET_PORT,
bind_address='0.0.0.0',
disable_websocket=False, resource_dirs=None, disable_websocket=False, resource_dirs=None,
ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None, ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None,
maps=None, run_externally=False, uwsgi_args=None, **kwargs): maps=None, run_externally=False, uwsgi_args=None, **kwargs):
@ -181,6 +182,9 @@ class HttpBackend(Backend):
:param websocket_port: Listen port for the websocket server (default: 8009) :param websocket_port: Listen port for the websocket server (default: 8009)
:type websocket_port: int :type websocket_port: int
:param bind_address: Address/interface to bind to (default: 0.0.0.0, accept connection from any IP)
:type bind_address: str
:param disable_websocket: Disable the websocket interface (default: False) :param disable_websocket: Disable the websocket interface (default: False)
:type disable_websocket: bool :type disable_websocket: bool
@ -233,6 +237,8 @@ class HttpBackend(Backend):
self.server_proc = None self.server_proc = None
self.disable_websocket = disable_websocket self.disable_websocket = disable_websocket
self.websocket_thread = None self.websocket_thread = None
self._websocket_loop = None
self.bind_address = bind_address
if resource_dirs: if resource_dirs:
self.resource_dirs = {name: os.path.abspath( self.resource_dirs = {name: os.path.abspath(
@ -271,10 +277,24 @@ class HttpBackend(Backend):
if self.server_proc: if self.server_proc:
if isinstance(self.server_proc, subprocess.Popen): if isinstance(self.server_proc, subprocess.Popen):
self.server_proc.kill() self.server_proc.kill()
self.server_proc.wait() self.server_proc.wait(timeout=10)
if self.server_proc.poll() is not None:
self.logger.warning('HTTP server process still alive at termination')
else:
self.logger.info('HTTP server process terminated')
else: else:
self.server_proc.terminate() self.server_proc.terminate()
self.server_proc.join() self.server_proc.join(timeout=10)
if self.server_proc.is_alive():
self.server_proc.kill()
if self.server_proc.is_alive():
self.logger.warning('HTTP server process still alive at termination')
else:
self.logger.info('HTTP server process terminated')
if self.websocket_thread and self.websocket_thread.is_alive() and self._websocket_loop:
self._websocket_loop.stop()
self.logger.info('HTTP websocket service terminated')
def _acquire_websocket_lock(self, ws): def _acquire_websocket_lock(self, ws):
try: try:
@ -355,17 +375,17 @@ class HttpBackend(Backend):
if self.ssl_context: if self.ssl_context:
websocket_args['ssl'] = self.ssl_context websocket_args['ssl'] = self.ssl_context
loop = get_or_create_event_loop() self._websocket_loop = get_or_create_event_loop()
loop.run_until_complete( self._websocket_loop.run_until_complete(
websockets.serve(register_websocket, '0.0.0.0', self.websocket_port, websockets.serve(register_websocket, self.bind_address, self.websocket_port,
**websocket_args)) **websocket_args))
loop.run_forever() self._websocket_loop.run_forever()
def _start_web_server(self): def _start_web_server(self):
def proc(): def proc():
self.logger.info('Starting local web server on port {}'.format(self.port)) self.logger.info('Starting local web server on port {}'.format(self.port))
kwargs = { kwargs = {
'host': '0.0.0.0', 'host': self.bind_address,
'port': self.port, 'port': self.port,
'use_reloader': False, 'use_reloader': False,
'debug': False, 'debug': False,

View file

@ -1,5 +1,3 @@
import time
from threading import Thread from threading import Thread
from platypush.backend import Backend from platypush.backend import Backend
@ -80,12 +78,13 @@ class LightHueBackend(Backend):
except Exception as e: except Exception as e:
self.logger.exception(e) self.logger.exception(e)
finally: finally:
time.sleep(self.poll_seconds) self.wait_stop(self.poll_seconds)
return _thread return _thread
def run(self): def run(self):
super().run() super().run()
self.logger.info('Starting Hue lights backend')
while not self.should_stop(): while not self.should_stop():
try: try:
@ -94,7 +93,9 @@ class LightHueBackend(Backend):
poll_thread.join() poll_thread.join()
except Exception as e: except Exception as e:
self.logger.exception(e) self.logger.exception(e)
time.sleep(self.poll_seconds) self.wait_stop(self.poll_seconds)
self.logger.info('Stopped Hue lights backend')
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,20 +1,16 @@
import json
import os import os
import time import time
import threading
from .. import Backend from .. import Backend
from platypush.message import Message from platypush.message import Message
from platypush.message.event import StopEvent
from platypush.message.request import Request
from platypush.message.response import Response from platypush.message.response import Response
class LocalBackend(Backend): class LocalBackend(Backend):
""" """
Sends and receive messages on two distinct local FIFOs, one for Sends and receive messages on two distinct local FIFOs, one for the requests and one for the responses.
the requests and one for the responses. This is a legacy backend that should only be used for testing purposes.
You can use this backend either to send local commands to push through You can use this backend either to send local commands to push through
Pusher (or any other script), or debug. You can even send command on the Pusher (or any other script), or debug. You can even send command on the
@ -41,8 +37,7 @@ class LocalBackend(Backend):
try: os.mkfifo(self.response_fifo) try: os.mkfifo(self.response_fifo)
except FileExistsError as e: pass except FileExistsError as e: pass
def send_message(self, msg, **kwargs):
def send_message(self, msg):
fifo = self.response_fifo \ fifo = self.response_fifo \
if isinstance(msg, Response) or self._request_context \ if isinstance(msg, Response) or self._request_context \
else self.request_fifo else self.request_fifo

View file

@ -334,17 +334,17 @@ class MqttBackend(Backend):
self.add_listeners(*self.listeners_conf) self.add_listeners(*self.listeners_conf)
def stop(self): def on_stop(self):
self.logger.info('Received STOP event on the MQTT backend') self.logger.info('Received STOP event on the MQTT backend')
for ((host, port, _), listener) in self._listeners.items(): for ((host, port, _), listener) in self._listeners.items():
try: try:
listener.loop_stop() listener.stop()
listener.disconnect()
except Exception as e: except Exception as e:
# noinspection PyProtectedMember # noinspection PyProtectedMember
self.logger.warning('Could not stop listener {host}:{port}: {error}'. self.logger.warning('Could not stop listener {host}:{port}: {error}'.
format(host=host, port=port, error=str(e))) format(host=host, port=port, error=str(e)))
self.logger.info('MQTT backend terminated')
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -224,6 +224,8 @@ class MusicMopidyBackend(Backend):
self._connected_event.clear() self._connected_event.clear()
self._ws = None self._ws = None
self.logger.warning('Mopidy websocket connection closed') self.logger.warning('Mopidy websocket connection closed')
if not self.should_stop():
self._retry_connect() self._retry_connect()
return hndl return hndl
@ -252,5 +254,12 @@ class MusicMopidyBackend(Backend):
self.logger.info('Started tracking Mopidy events backend on {}:{}'.format(self.host, self.port)) self.logger.info('Started tracking Mopidy events backend on {}:{}'.format(self.host, self.port))
self._connect() self._connect()
def on_stop(self):
self.logger.info('Received STOP event on the Mopidy backend')
if self._ws:
self._ws.close()
self.logger.info('Mopidy backend terminated')
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,4 +1,5 @@
import json import json
import select
import socket import socket
import threading import threading
import time import time
@ -94,9 +95,16 @@ class MusicSnapcastBackend(Backend):
@classmethod @classmethod
def _recv(cls, sock): def _recv(cls, sock):
sock.setblocking(0)
buf = b'' buf = b''
while buf[-2:] != cls._SOCKET_EOL: while buf[-2:] != cls._SOCKET_EOL:
ready = select.select([sock], [], [], 0.5)
if ready[0]:
buf += sock.recv(1) buf += sock.recv(1)
else:
return
return json.loads(buf.decode().strip()) return json.loads(buf.decode().strip())
@classmethod @classmethod
@ -150,6 +158,8 @@ class MusicSnapcastBackend(Backend):
sock = self._connect(host, port) sock = self._connect(host, port)
msgs = self._recv(sock) msgs = self._recv(sock)
if msgs is None:
continue
if not isinstance(msgs, list): if not isinstance(msgs, list):
msgs = [msgs] msgs = [msgs]
@ -157,6 +167,7 @@ class MusicSnapcastBackend(Backend):
self.logger.debug('Received message on {host}:{port}: {msg}'. self.logger.debug('Received message on {host}:{port}: {msg}'.
format(host=host, port=port, msg=msg)) format(host=host, port=port, msg=msg))
# noinspection PyTypeChecker
evt = self._parse_msg(host=host, msg=msg) evt = self._parse_msg(host=host, msg=msg)
if evt: if evt:
self.bus.post(evt) self.bus.post(evt)
@ -191,5 +202,17 @@ class MusicSnapcastBackend(Backend):
for host in self.hosts: for host in self.hosts:
self._threads[host].join() self._threads[host].join()
self.logger.info('Snapcast backend terminated')
def on_stop(self):
self.logger.info('Received STOP event on the Snapcast backend')
for host, sock in self._socks.items():
if sock:
try:
sock.close()
except Exception as e:
self.logger.warning('Could not close Snapcast connection to {}: {}: {}'.format(
host, type(e), str(e)))
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -127,8 +127,10 @@ class PushbulletBackend(Backend):
self.listener = None self.listener = None
def on_stop(self): def on_stop(self):
self.logger.info('Received STOP event on the Pushbullet backend')
super().on_stop() super().on_stop()
return self.close() self.close()
self.logger.info('Pushbullet backend terminated')
def on_error(self, e): def on_error(self, e):
self.logger.exception(e) self.logger.exception(e)

View file

@ -53,10 +53,13 @@ class RedisBackend(Backend):
# noinspection PyBroadException # noinspection PyBroadException
def get_message(self, queue_name=None): def get_message(self, queue_name=None):
queue = queue_name or self.queue queue = queue_name or self.queue
msg = self.redis.blpop(queue)[1].decode('utf-8') data = self.redis.blpop(queue, timeout=1)
if data is None:
return
msg = data[1].decode()
try: try:
msg = Message.build(json.loads(msg)) msg = Message.build(msg)
except: except:
try: try:
import ast import ast
@ -78,11 +81,16 @@ class RedisBackend(Backend):
while not self.should_stop(): while not self.should_stop():
try: try:
msg = self.get_message() msg = self.get_message()
if not msg:
continue
self.logger.info('Received message on the Redis backend: {}'. self.logger.info('Received message on the Redis backend: {}'.
format(msg)) format(msg))
self.on_message(msg) self.on_message(msg)
except Exception as e: except Exception as e:
self.logger.exception(e) self.logger.exception(e)
self.logger.info('Redis backend terminated')
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -21,7 +21,7 @@ class SensorIrZeroborgBackend(Backend):
* :class:`platypush.message.event.sensor.ir.IrKeyUpEvent` when a key is released * :class:`platypush.message.event.sensor.ir.IrKeyUpEvent` when a key is released
""" """
last_message =None last_message = None
last_message_timestamp = None last_message_timestamp = None
def __init__(self, no_message_timeout=0.37, **kwargs): def __init__(self, no_message_timeout=0.37, **kwargs):
@ -31,11 +31,10 @@ class SensorIrZeroborgBackend(Backend):
self.zb.Init() self.zb.Init()
self.logger.info('Initialized Zeroborg infrared sensor backend') self.logger.info('Initialized Zeroborg infrared sensor backend')
def run(self): def run(self):
super().run() super().run()
while True: while not self.should_stop():
try: try:
self.zb.GetIrMessage() self.zb.GetIrMessage()
if self.zb.HasNewIrMessage(): if self.zb.HasNewIrMessage():
@ -47,7 +46,7 @@ class SensorIrZeroborgBackend(Backend):
self.last_message = message self.last_message = message
self.last_message_timestamp = time.time() self.last_message_timestamp = time.time()
except OSError as e: except OSError as e:
self.logger.warning('Failed reading IR sensor status') self.logger.warning('Failed reading IR sensor status: {}: {}'.format(type(e), str(e)))
if self.last_message_timestamp and \ if self.last_message_timestamp and \
time.time() - self.last_message_timestamp > self.no_message_timeout: time.time() - self.last_message_timestamp > self.no_message_timeout:
@ -57,6 +56,4 @@ class SensorIrZeroborgBackend(Backend):
self.last_message = None self.last_message = None
self.last_message_timestamp = None self.last_message_timestamp = None
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -39,7 +39,7 @@ class TcpBackend(Backend):
msg = b'' msg = b''
prev_ch = None prev_ch = None
while True: while not self.should_stop():
if processed_bytes > self._MAX_REQ_SIZE: if processed_bytes > self._MAX_REQ_SIZE:
self.logger.warning('Ignoring message longer than {} bytes from {}' self.logger.warning('Ignoring message longer than {} bytes from {}'
.format(self._MAX_REQ_SIZE, address[0])) .format(self._MAX_REQ_SIZE, address[0]))
@ -95,6 +95,8 @@ class TcpBackend(Backend):
serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv_sock.bind((self.bind_address, self.port)) serv_sock.bind((self.bind_address, self.port))
serv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serv_sock.settimeout(0.5)
self.logger.info('Initialized TCP backend on port {} with bind address {}'. self.logger.info('Initialized TCP backend on port {} with bind address {}'.
format(self.port, self.bind_address)) format(self.port, self.bind_address))
@ -102,9 +104,15 @@ class TcpBackend(Backend):
serv_sock.listen(self.listen_queue) serv_sock.listen(self.listen_queue)
while not self.should_stop(): while not self.should_stop():
try:
(sock, address) = serv_sock.accept() (sock, address) = serv_sock.accept()
except socket.timeout:
continue
self.logger.info('Accepted connection from client {}'.format(address[0])) self.logger.info('Accepted connection from client {}'.format(address[0]))
self._process_client(sock, address) self._process_client(sock, address)
self.logger.info('TCP backend terminated')
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -56,6 +56,7 @@ class WebsocketBackend(Backend):
self.bind_address = bind_address self.bind_address = bind_address
self.client_timeout = client_timeout self.client_timeout = client_timeout
self.active_websockets = set() self.active_websockets = set()
self._loop = None
self.ssl_context = get_ssl_server_context(ssl_cert=ssl_cert, self.ssl_context = get_ssl_server_context(ssl_cert=ssl_cert,
ssl_key=ssl_key, ssl_key=ssl_key,
@ -104,7 +105,7 @@ class WebsocketBackend(Backend):
format(websocket.remote_address[0])) format(websocket.remote_address[0]))
try: try:
while True: while not self.should_stop():
if self.client_timeout: if self.client_timeout:
msg = await asyncio.wait_for(websocket.recv(), msg = await asyncio.wait_for(websocket.recv(),
timeout=self.client_timeout) timeout=self.client_timeout)
@ -142,12 +143,19 @@ class WebsocketBackend(Backend):
if self.ssl_context: if self.ssl_context:
websocket_args['ssl'] = self.ssl_context websocket_args['ssl'] = self.ssl_context
loop = get_or_create_event_loop() self._loop = get_or_create_event_loop()
server = websockets.serve(serve_client, self.bind_address, self.port, server = websockets.serve(serve_client, self.bind_address, self.port,
**websocket_args) **websocket_args)
loop.run_until_complete(server) self._loop.run_until_complete(server)
loop.run_forever() self._loop.run_forever()
def on_stop(self):
self.logger.info('Received STOP event on the websocket backend')
if self._loop:
self._loop.stop()
self.logger.info('Websocket backend terminated')
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -2,11 +2,10 @@ import logging
import threading import threading
import time import time
from queue import Queue from queue import Queue, Empty
from typing import Callable, Type from typing import Callable, Type
from platypush.config import Config from platypush.message.event import Event
from platypush.message.event import Event, StopEvent
logger = logging.getLogger('platypush:bus') logger = logging.getLogger('platypush:bus')
@ -21,6 +20,7 @@ class Bus(object):
self.on_message = on_message self.on_message = on_message
self.thread_id = threading.get_ident() self.thread_id = threading.get_ident()
self.event_handlers = {} self.event_handlers = {}
self._should_stop = threading.Event()
def post(self, msg): def post(self, msg):
""" Sends a message to the bus """ """ Sends a message to the bus """
@ -28,15 +28,13 @@ class Bus(object):
def get(self): def get(self):
""" Reads one message from the bus """ """ Reads one message from the bus """
return self.bus.get() try:
return self.bus.get(timeout=0.1)
except Empty:
return
def stop(self): def stop(self):
""" Stops the bus by sending a STOP event """ self._should_stop.set()
evt = StopEvent(target=Config.get('device_id'),
origin=Config.get('device_id'),
thread_id=self.thread_id)
self.post(evt)
def _msg_executor(self, msg): def _msg_executor(self, msg):
def event_handler(event: Event, handler: Callable[[Event], None]): def event_handler(event: Event, handler: Callable[[Event], None]):
@ -62,18 +60,22 @@ class Bus(object):
return executor return executor
def should_stop(self):
return self._should_stop.is_set()
def poll(self): def poll(self):
""" """
Reads messages from the bus until either stop event message or KeyboardInterrupt Reads messages from the bus until either stop event message or KeyboardInterrupt
""" """
if not self.on_message: if not self.on_message:
logger.warning('No message handlers installed, cannot poll') logger.warning('No message handlers installed, cannot poll')
return return
stop = False while not self.should_stop():
while not stop:
msg = self.get() msg = self.get()
if msg is None:
continue
timestamp = msg.timestamp if hasattr(msg, 'timestamp') else msg.get('timestamp') timestamp = msg.timestamp if hasattr(msg, 'timestamp') else msg.get('timestamp')
if timestamp and time.time() - timestamp > self._MSG_EXPIRY_TIMEOUT: if timestamp and time.time() - timestamp > self._MSG_EXPIRY_TIMEOUT:
logger.debug('{} seconds old message on the bus expired, ignoring it: {}'. logger.debug('{} seconds old message on the bus expired, ignoring it: {}'.
@ -82,9 +84,7 @@ class Bus(object):
threading.Thread(target=self._msg_executor(msg)).start() threading.Thread(target=self._msg_executor(msg)).start()
if isinstance(msg, StopEvent) and msg.targets_me(): logger.info('Bus service stoppped')
logger.info('Received STOP event on the bus')
stop = True
def register_handler(self, event_type: Type[Event], handler: Callable[[Event], None]) -> Callable[[], None]: def register_handler(self, event_type: Type[Event], handler: Callable[[Event], None]) -> Callable[[], None]:
""" """

View file

@ -32,9 +32,11 @@ class RedisBus(Bus):
def get(self): def get(self):
""" Reads one message from the Redis queue """ """ Reads one message from the Redis queue """
msg = None msg = None
try: try:
msg = self.redis.blpop(self.redis_queue) if self.should_stop():
return
msg = self.redis.blpop(self.redis_queue, timeout=1)
if not msg or msg[1] is None: if not msg or msg[1] is None:
return return
@ -54,5 +56,9 @@ class RedisBus(Bus):
""" Sends a message to the Redis queue """ """ Sends a message to the Redis queue """
return self.redis.rpush(self.redis_queue, str(msg)) return self.redis.rpush(self.redis_queue, str(msg))
def stop(self):
super().stop()
self.redis.close()
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,11 +1,10 @@
import enum import enum
import logging import logging
import threading
import time import time
import croniter import croniter
from threading import Thread
from platypush.procedure import Procedure from platypush.procedure import Procedure
from platypush.utils import is_functional_cron from platypush.utils import is_functional_cron
@ -20,12 +19,13 @@ class CronjobState(enum.IntEnum):
ERROR = 4 ERROR = 4
class Cronjob(Thread): class Cronjob(threading.Thread):
def __init__(self, name, cron_expression, actions): def __init__(self, name, cron_expression, actions):
super().__init__() super().__init__()
self.cron_expression = cron_expression self.cron_expression = cron_expression
self.name = name self.name = name
self.state = CronjobState.IDLE self.state = CronjobState.IDLE
self._should_stop = threading.Event()
if isinstance(actions, dict) or isinstance(actions, list): if isinstance(actions, dict) or isinstance(actions, list):
self.actions = Procedure.build(name=name + '__Cron', _async=False, requests=actions) self.actions = Procedure.build(name=name + '__Cron', _async=False, requests=actions)
@ -35,6 +35,9 @@ class Cronjob(Thread):
def run(self): def run(self):
self.state = CronjobState.WAIT self.state = CronjobState.WAIT
self.wait() self.wait()
if self.should_stop():
return
self.state = CronjobState.RUNNING self.state = CronjobState.RUNNING
try: try:
@ -56,7 +59,7 @@ class Cronjob(Thread):
now = int(time.time()) now = int(time.time())
cron = croniter.croniter(self.cron_expression, now) cron = croniter.croniter(self.cron_expression, now)
next_run = int(cron.get_next()) next_run = int(cron.get_next())
time.sleep(next_run - now) self._should_stop.wait(next_run - now)
def should_run(self): def should_run(self):
now = int(time.time()) now = int(time.time())
@ -64,12 +67,19 @@ class Cronjob(Thread):
next_run = int(cron.get_next()) next_run = int(cron.get_next())
return now == next_run return now == next_run
def stop(self):
self._should_stop.set()
class CronScheduler(Thread): def should_stop(self):
return self._should_stop.is_set()
class CronScheduler(threading.Thread):
def __init__(self, jobs): def __init__(self, jobs):
super().__init__() super().__init__()
self.jobs_config = jobs self.jobs_config = jobs
self._jobs = {} self._jobs = {}
self._should_stop = threading.Event()
logger.info('Cron scheduler initialized with {} jobs'. logger.info('Cron scheduler initialized with {} jobs'.
format(len(self.jobs_config.keys()))) format(len(self.jobs_config.keys())))
@ -90,10 +100,18 @@ class CronScheduler(Thread):
return self._jobs[name] return self._jobs[name]
def stop(self):
for job in self._jobs.values():
job.stop()
self._should_stop.set()
def should_stop(self):
return self._should_stop.is_set()
def run(self): def run(self):
logger.info('Running cron scheduler') logger.info('Running cron scheduler')
while True: while not self.should_stop():
for (job_name, job_config) in self.jobs_config.items(): for (job_name, job_config) in self.jobs_config.items():
job = self._get_job(name=job_name, config=job_config) job = self._get_job(name=job_name, config=job_config)
if job.state == CronjobState.IDLE: if job.state == CronjobState.IDLE:
@ -101,5 +119,7 @@ class CronScheduler(Thread):
time.sleep(0.5) time.sleep(0.5)
logger.info('Terminating cron scheduler')
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -2,7 +2,6 @@ import copy
import json import json
import random import random
import re import re
import threading
import time import time
from datetime import date from datetime import date
@ -199,29 +198,6 @@ class EventMatchResult(object):
self.parsed_args = {} if not parsed_args else parsed_args self.parsed_args = {} if not parsed_args else parsed_args
# XXX Should be a stop Request, not an Event
class StopEvent(Event):
""" StopEvent message. When received on a Bus, it will terminate the
listening thread having the specified ID. Useful to keep listeners in
sync and make them quit when the application terminates """
def __init__(self, target, origin, thread_id, id=None, **kwargs):
""" Constructor.
:param target: Target node
:param origin: Origin node
:param thread_id: thread_iden() to be terminated if listening on the bus
:param id: Event ID (default: auto-generated)
:param kwargs: Extra key-value arguments
"""
super().__init__(target=target, origin=origin, id=id,
thread_id=thread_id, **kwargs)
def targets_me(self):
""" Returns true if the stop event is for the current thread """
return self.args['thread_id'] == threading.get_ident()
def flatten(args): def flatten(args):
if isinstance(args, dict): if isinstance(args, dict):
for (key, value) in args.items(): for (key, value) in args.items():