From 2800bac3fbd506003f0e134305b5735c4bf6aa82 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 23 Feb 2021 23:07:35 +0100 Subject: [PATCH] Make sure that all hanging threads, backends and services are stopped and their resources cleaned up when the application stops. --- platypush/__init__.py | 17 ++++++---- platypush/backend/__init__.py | 20 ++++------- platypush/backend/camera/pi.py | 2 +- platypush/backend/clipboard.py | 2 ++ platypush/backend/github.py | 2 +- platypush/backend/http/__init__.py | 34 +++++++++++++++---- platypush/backend/light/hue.py | 9 ++--- platypush/backend/local/__init__.py | 11 ++---- platypush/backend/mqtt.py | 6 ++-- platypush/backend/music/mopidy.py | 11 +++++- platypush/backend/music/snapcast.py | 25 +++++++++++++- platypush/backend/pushbullet/__init__.py | 4 ++- platypush/backend/redis.py | 12 +++++-- .../backend/sensor/ir/zeroborg/__init__.py | 9 ++--- platypush/backend/tcp.py | 12 +++++-- platypush/backend/websocket.py | 16 ++++++--- platypush/bus/__init__.py | 32 ++++++++--------- platypush/bus/redis.py | 10 ++++-- platypush/cron/scheduler.py | 32 +++++++++++++---- platypush/message/event/__init__.py | 24 ------------- 20 files changed, 181 insertions(+), 109 deletions(-) diff --git a/platypush/__init__.py b/platypush/__init__.py index ca4daea88b..ade16fa743 100644 --- a/platypush/__init__.py +++ b/platypush/__init__.py @@ -16,7 +16,7 @@ from .context import register_backends from .cron.scheduler import CronScheduler from .event.processor import EventProcessor from .logger import Logger -from .message.event import Event, StopEvent +from .message.event import Event from .message.event.application import ApplicationStartedEvent, ApplicationStoppedEvent from .message.request import Request from .message.response import Response @@ -80,6 +80,7 @@ class Daemon: self.event_processor = EventProcessor() self.requests_to_process = requests_to_process self.processed_requests = 0 + self.cron_scheduler = None @classmethod def build_from_cmdline(cls, args): @@ -135,9 +136,6 @@ class Daemon: self.stop_app() elif isinstance(msg, Response): logger.info('Received response: {}'.format(msg)) - elif isinstance(msg, StopEvent) and msg.targets_me(): - logger.info('Received STOP event: {}'.format(msg)) - self.stop_app() elif isinstance(msg, Event): if not msg.disable_logging: logger.info('Received event: {}'.format(msg)) @@ -147,9 +145,14 @@ class Daemon: def stop_app(self): """ Stops the backends and the bus """ + self.bus.post(ApplicationStoppedEvent()) + for backend in self.backends.values(): backend.stop() + self.bus.stop() + if self.cron_scheduler: + self.cron_scheduler.stop() def start(self): """ Start the daemon """ @@ -175,7 +178,8 @@ class Daemon: # Start the cron scheduler if Config.get_cronjobs(): - CronScheduler(jobs=Config.get_cronjobs()).start() + self.cron_scheduler = CronScheduler(jobs=Config.get_cronjobs()) + self.cron_scheduler.start() self.bus.post(ApplicationStartedEvent()) @@ -185,9 +189,10 @@ class Daemon: except KeyboardInterrupt: logger.info('SIGINT received, terminating application') finally: - self.bus.post(ApplicationStoppedEvent()) self.stop_app() + sys.exit(0) + def main(): """ diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index ce7f39c08d..bcd6e670df 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -22,7 +22,7 @@ from platypush.utils import set_timeout, clear_timeout, \ from platypush import __version__ from platypush.event import EventGenerator from platypush.message import Message -from platypush.message.event import Event, StopEvent +from platypush.message.event import Event from platypush.message.request import Request from platypush.message.response import Response @@ -62,7 +62,6 @@ class Backend(Thread, EventGenerator): self.poll_seconds = float(poll_seconds) if poll_seconds else None self.device_id = Config.get('device_id') self.thread_id = None - self._should_stop = False self._stop_event = threading.Event() self._kwargs = kwargs self.logger = logging.getLogger('platypush:backend:' + get_backend_name_by_class(self.__class__)) @@ -103,12 +102,8 @@ class Backend(Thread, EventGenerator): self.stop() return - if isinstance(msg, StopEvent) and msg.targets_me(): - self.logger.info('Received STOP event on {}'.format(self.__class__.__name__)) - self._should_stop = True - else: - msg.backend = self # Augment message to be able to process responses - self.bus.post(msg) + msg.backend = self # Augment message to be able to process responses + self.bus.post(msg) def _is_expected_response(self, msg): """ Internal only - returns true if we are expecting for a response @@ -263,22 +258,19 @@ class Backend(Thread, EventGenerator): def on_stop(self): """ Callback invoked when the process stops """ - self.unregister_service() + pass def stop(self): """ Stops the backend thread by sending a STOP event on its bus """ def _async_stop(): - evt = StopEvent(target=self.device_id, origin=self.device_id, - thread_id=self.thread_id) - - self.send_message(evt) self._stop_event.set() + self.unregister_service() self.on_stop() Thread(target=_async_stop).start() def should_stop(self): - return self._should_stop + return self._stop_event.is_set() def wait_stop(self, timeout=None) -> bool: return self._stop_event.wait(timeout) diff --git a/platypush/backend/camera/pi.py b/platypush/backend/camera/pi.py index e6ec3b74a6..b2f67dea73 100644 --- a/platypush/backend/camera/pi.py +++ b/platypush/backend/camera/pi.py @@ -122,7 +122,7 @@ class CameraPiBackend(Backend): while True: self.camera.wait_recording(2) else: - while True: + while not self.should_stop(): connection = self.server_socket.accept()[0].makefile('wb') self.logger.info('Accepted client connection on port {}'.format(self.listen_port)) diff --git a/platypush/backend/clipboard.py b/platypush/backend/clipboard.py index 78299a4f47..3fdbe5b70d 100644 --- a/platypush/backend/clipboard.py +++ b/platypush/backend/clipboard.py @@ -26,6 +26,7 @@ class ClipboardBackend(Backend): self._last_text: Optional[str] = None def run(self): + self.logger.info('Started clipboard monitor backend') while not self.should_stop(): text = pyperclip.paste() if text and text != self._last_text: @@ -34,5 +35,6 @@ class ClipboardBackend(Backend): self._last_text = text time.sleep(0.1) + self.logger.info('Stopped clipboard monitor backend') # vim:sw=4:ts=4:et: diff --git a/platypush/backend/github.py b/platypush/backend/github.py index b1fd3dfaba..f4a2e8fe03 100644 --- a/platypush/backend/github.py +++ b/platypush/backend/github.py @@ -165,7 +165,7 @@ class GithubBackend(Backend): def _events_monitor(self, uri: str, method: str = 'get'): def thread(): - while True: + while not self.should_stop(): try: events = self._request(uri, method) if not events: diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index fc7618893c..2501075f1c 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -171,6 +171,7 @@ class HttpBackend(Backend): def __init__(self, port=_DEFAULT_HTTP_PORT, websocket_port=_DEFAULT_WEBSOCKET_PORT, + bind_address='0.0.0.0', disable_websocket=False, resource_dirs=None, ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None, maps=None, run_externally=False, uwsgi_args=None, **kwargs): @@ -181,6 +182,9 @@ class HttpBackend(Backend): :param websocket_port: Listen port for the websocket server (default: 8009) :type websocket_port: int + :param bind_address: Address/interface to bind to (default: 0.0.0.0, accept connection from any IP) + :type bind_address: str + :param disable_websocket: Disable the websocket interface (default: False) :type disable_websocket: bool @@ -233,6 +237,8 @@ class HttpBackend(Backend): self.server_proc = None self.disable_websocket = disable_websocket self.websocket_thread = None + self._websocket_loop = None + self.bind_address = bind_address if resource_dirs: self.resource_dirs = {name: os.path.abspath( @@ -271,10 +277,24 @@ class HttpBackend(Backend): if self.server_proc: if isinstance(self.server_proc, subprocess.Popen): self.server_proc.kill() - self.server_proc.wait() + self.server_proc.wait(timeout=10) + if self.server_proc.poll() is not None: + self.logger.warning('HTTP server process still alive at termination') + else: + self.logger.info('HTTP server process terminated') else: self.server_proc.terminate() - self.server_proc.join() + self.server_proc.join(timeout=10) + if self.server_proc.is_alive(): + self.server_proc.kill() + if self.server_proc.is_alive(): + self.logger.warning('HTTP server process still alive at termination') + else: + self.logger.info('HTTP server process terminated') + + if self.websocket_thread and self.websocket_thread.is_alive() and self._websocket_loop: + self._websocket_loop.stop() + self.logger.info('HTTP websocket service terminated') def _acquire_websocket_lock(self, ws): try: @@ -355,17 +375,17 @@ class HttpBackend(Backend): if self.ssl_context: websocket_args['ssl'] = self.ssl_context - loop = get_or_create_event_loop() - loop.run_until_complete( - websockets.serve(register_websocket, '0.0.0.0', self.websocket_port, + self._websocket_loop = get_or_create_event_loop() + self._websocket_loop.run_until_complete( + websockets.serve(register_websocket, self.bind_address, self.websocket_port, **websocket_args)) - loop.run_forever() + self._websocket_loop.run_forever() def _start_web_server(self): def proc(): self.logger.info('Starting local web server on port {}'.format(self.port)) kwargs = { - 'host': '0.0.0.0', + 'host': self.bind_address, 'port': self.port, 'use_reloader': False, 'debug': False, diff --git a/platypush/backend/light/hue.py b/platypush/backend/light/hue.py index 78fc4c5711..95cbed6ff8 100644 --- a/platypush/backend/light/hue.py +++ b/platypush/backend/light/hue.py @@ -1,5 +1,3 @@ -import time - from threading import Thread from platypush.backend import Backend @@ -80,12 +78,13 @@ class LightHueBackend(Backend): except Exception as e: self.logger.exception(e) finally: - time.sleep(self.poll_seconds) + self.wait_stop(self.poll_seconds) return _thread def run(self): super().run() + self.logger.info('Starting Hue lights backend') while not self.should_stop(): try: @@ -94,7 +93,9 @@ class LightHueBackend(Backend): poll_thread.join() except Exception as e: self.logger.exception(e) - time.sleep(self.poll_seconds) + self.wait_stop(self.poll_seconds) + + self.logger.info('Stopped Hue lights backend') # vim:sw=4:ts=4:et: diff --git a/platypush/backend/local/__init__.py b/platypush/backend/local/__init__.py index 8c9186dad0..b3fa47ca4a 100644 --- a/platypush/backend/local/__init__.py +++ b/platypush/backend/local/__init__.py @@ -1,20 +1,16 @@ -import json import os import time -import threading from .. import Backend from platypush.message import Message -from platypush.message.event import StopEvent -from platypush.message.request import Request from platypush.message.response import Response class LocalBackend(Backend): """ - Sends and receive messages on two distinct local FIFOs, one for - the requests and one for the responses. + Sends and receive messages on two distinct local FIFOs, one for the requests and one for the responses. + This is a legacy backend that should only be used for testing purposes. You can use this backend either to send local commands to push through Pusher (or any other script), or debug. You can even send command on the @@ -41,8 +37,7 @@ class LocalBackend(Backend): try: os.mkfifo(self.response_fifo) except FileExistsError as e: pass - - def send_message(self, msg): + def send_message(self, msg, **kwargs): fifo = self.response_fifo \ if isinstance(msg, Response) or self._request_context \ else self.request_fifo diff --git a/platypush/backend/mqtt.py b/platypush/backend/mqtt.py index 40e583e561..cf27f942f8 100644 --- a/platypush/backend/mqtt.py +++ b/platypush/backend/mqtt.py @@ -334,17 +334,17 @@ class MqttBackend(Backend): self.add_listeners(*self.listeners_conf) - def stop(self): + def on_stop(self): self.logger.info('Received STOP event on the MQTT backend') for ((host, port, _), listener) in self._listeners.items(): try: - listener.loop_stop() - listener.disconnect() + listener.stop() except Exception as e: # noinspection PyProtectedMember self.logger.warning('Could not stop listener {host}:{port}: {error}'. format(host=host, port=port, error=str(e))) + self.logger.info('MQTT backend terminated') # vim:sw=4:ts=4:et: diff --git a/platypush/backend/music/mopidy.py b/platypush/backend/music/mopidy.py index 86ea153392..161fbd5ccf 100644 --- a/platypush/backend/music/mopidy.py +++ b/platypush/backend/music/mopidy.py @@ -224,7 +224,9 @@ class MusicMopidyBackend(Backend): self._connected_event.clear() self._ws = None self.logger.warning('Mopidy websocket connection closed') - self._retry_connect() + + if not self.should_stop(): + self._retry_connect() return hndl @@ -252,5 +254,12 @@ class MusicMopidyBackend(Backend): self.logger.info('Started tracking Mopidy events backend on {}:{}'.format(self.host, self.port)) self._connect() + def on_stop(self): + self.logger.info('Received STOP event on the Mopidy backend') + if self._ws: + self._ws.close() + + self.logger.info('Mopidy backend terminated') + # vim:sw=4:ts=4:et: diff --git a/platypush/backend/music/snapcast.py b/platypush/backend/music/snapcast.py index b7cf9270c0..53ea555c02 100644 --- a/platypush/backend/music/snapcast.py +++ b/platypush/backend/music/snapcast.py @@ -1,4 +1,5 @@ import json +import select import socket import threading import time @@ -94,9 +95,16 @@ class MusicSnapcastBackend(Backend): @classmethod def _recv(cls, sock): + sock.setblocking(0) buf = b'' + while buf[-2:] != cls._SOCKET_EOL: - buf += sock.recv(1) + ready = select.select([sock], [], [], 0.5) + if ready[0]: + buf += sock.recv(1) + else: + return + return json.loads(buf.decode().strip()) @classmethod @@ -150,6 +158,8 @@ class MusicSnapcastBackend(Backend): sock = self._connect(host, port) msgs = self._recv(sock) + if msgs is None: + continue if not isinstance(msgs, list): msgs = [msgs] @@ -157,6 +167,7 @@ class MusicSnapcastBackend(Backend): self.logger.debug('Received message on {host}:{port}: {msg}'. format(host=host, port=port, msg=msg)) + # noinspection PyTypeChecker evt = self._parse_msg(host=host, msg=msg) if evt: self.bus.post(evt) @@ -191,5 +202,17 @@ class MusicSnapcastBackend(Backend): for host in self.hosts: self._threads[host].join() + self.logger.info('Snapcast backend terminated') + + def on_stop(self): + self.logger.info('Received STOP event on the Snapcast backend') + for host, sock in self._socks.items(): + if sock: + try: + sock.close() + except Exception as e: + self.logger.warning('Could not close Snapcast connection to {}: {}: {}'.format( + host, type(e), str(e))) + # vim:sw=4:ts=4:et: diff --git a/platypush/backend/pushbullet/__init__.py b/platypush/backend/pushbullet/__init__.py index e1fc67e01d..724fa702ee 100644 --- a/platypush/backend/pushbullet/__init__.py +++ b/platypush/backend/pushbullet/__init__.py @@ -127,8 +127,10 @@ class PushbulletBackend(Backend): self.listener = None def on_stop(self): + self.logger.info('Received STOP event on the Pushbullet backend') super().on_stop() - return self.close() + self.close() + self.logger.info('Pushbullet backend terminated') def on_error(self, e): self.logger.exception(e) diff --git a/platypush/backend/redis.py b/platypush/backend/redis.py index 79feae85fb..876f694dda 100644 --- a/platypush/backend/redis.py +++ b/platypush/backend/redis.py @@ -53,10 +53,13 @@ class RedisBackend(Backend): # noinspection PyBroadException def get_message(self, queue_name=None): queue = queue_name or self.queue - msg = self.redis.blpop(queue)[1].decode('utf-8') + data = self.redis.blpop(queue, timeout=1) + if data is None: + return + msg = data[1].decode() try: - msg = Message.build(json.loads(msg)) + msg = Message.build(msg) except: try: import ast @@ -78,11 +81,16 @@ class RedisBackend(Backend): while not self.should_stop(): try: msg = self.get_message() + if not msg: + continue + self.logger.info('Received message on the Redis backend: {}'. format(msg)) self.on_message(msg) except Exception as e: self.logger.exception(e) + self.logger.info('Redis backend terminated') + # vim:sw=4:ts=4:et: diff --git a/platypush/backend/sensor/ir/zeroborg/__init__.py b/platypush/backend/sensor/ir/zeroborg/__init__.py index a26af8b81e..076a6ea469 100644 --- a/platypush/backend/sensor/ir/zeroborg/__init__.py +++ b/platypush/backend/sensor/ir/zeroborg/__init__.py @@ -21,7 +21,7 @@ class SensorIrZeroborgBackend(Backend): * :class:`platypush.message.event.sensor.ir.IrKeyUpEvent` when a key is released """ - last_message =None + last_message = None last_message_timestamp = None def __init__(self, no_message_timeout=0.37, **kwargs): @@ -31,11 +31,10 @@ class SensorIrZeroborgBackend(Backend): self.zb.Init() self.logger.info('Initialized Zeroborg infrared sensor backend') - def run(self): super().run() - while True: + while not self.should_stop(): try: self.zb.GetIrMessage() if self.zb.HasNewIrMessage(): @@ -47,7 +46,7 @@ class SensorIrZeroborgBackend(Backend): self.last_message = message self.last_message_timestamp = time.time() except OSError as e: - self.logger.warning('Failed reading IR sensor status') + self.logger.warning('Failed reading IR sensor status: {}: {}'.format(type(e), str(e))) if self.last_message_timestamp and \ time.time() - self.last_message_timestamp > self.no_message_timeout: @@ -57,6 +56,4 @@ class SensorIrZeroborgBackend(Backend): self.last_message = None self.last_message_timestamp = None - # vim:sw=4:ts=4:et: - diff --git a/platypush/backend/tcp.py b/platypush/backend/tcp.py index 4d2945eeb3..996626cb41 100644 --- a/platypush/backend/tcp.py +++ b/platypush/backend/tcp.py @@ -39,7 +39,7 @@ class TcpBackend(Backend): msg = b'' prev_ch = None - while True: + while not self.should_stop(): if processed_bytes > self._MAX_REQ_SIZE: self.logger.warning('Ignoring message longer than {} bytes from {}' .format(self._MAX_REQ_SIZE, address[0])) @@ -95,6 +95,8 @@ class TcpBackend(Backend): serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serv_sock.bind((self.bind_address, self.port)) + serv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + serv_sock.settimeout(0.5) self.logger.info('Initialized TCP backend on port {} with bind address {}'. format(self.port, self.bind_address)) @@ -102,9 +104,15 @@ class TcpBackend(Backend): serv_sock.listen(self.listen_queue) while not self.should_stop(): - (sock, address) = serv_sock.accept() + try: + (sock, address) = serv_sock.accept() + except socket.timeout: + continue + self.logger.info('Accepted connection from client {}'.format(address[0])) self._process_client(sock, address) + self.logger.info('TCP backend terminated') + # vim:sw=4:ts=4:et: diff --git a/platypush/backend/websocket.py b/platypush/backend/websocket.py index 456cd0cf2b..38c60aac3e 100644 --- a/platypush/backend/websocket.py +++ b/platypush/backend/websocket.py @@ -56,6 +56,7 @@ class WebsocketBackend(Backend): self.bind_address = bind_address self.client_timeout = client_timeout self.active_websockets = set() + self._loop = None self.ssl_context = get_ssl_server_context(ssl_cert=ssl_cert, ssl_key=ssl_key, @@ -104,7 +105,7 @@ class WebsocketBackend(Backend): format(websocket.remote_address[0])) try: - while True: + while not self.should_stop(): if self.client_timeout: msg = await asyncio.wait_for(websocket.recv(), timeout=self.client_timeout) @@ -142,12 +143,19 @@ class WebsocketBackend(Backend): if self.ssl_context: websocket_args['ssl'] = self.ssl_context - loop = get_or_create_event_loop() + self._loop = get_or_create_event_loop() server = websockets.serve(serve_client, self.bind_address, self.port, **websocket_args) - loop.run_until_complete(server) - loop.run_forever() + self._loop.run_until_complete(server) + self._loop.run_forever() + + def on_stop(self): + self.logger.info('Received STOP event on the websocket backend') + if self._loop: + self._loop.stop() + + self.logger.info('Websocket backend terminated') # vim:sw=4:ts=4:et: diff --git a/platypush/bus/__init__.py b/platypush/bus/__init__.py index 8ad5517e35..86d889493f 100644 --- a/platypush/bus/__init__.py +++ b/platypush/bus/__init__.py @@ -2,11 +2,10 @@ import logging import threading import time -from queue import Queue +from queue import Queue, Empty from typing import Callable, Type -from platypush.config import Config -from platypush.message.event import Event, StopEvent +from platypush.message.event import Event logger = logging.getLogger('platypush:bus') @@ -21,6 +20,7 @@ class Bus(object): self.on_message = on_message self.thread_id = threading.get_ident() self.event_handlers = {} + self._should_stop = threading.Event() def post(self, msg): """ Sends a message to the bus """ @@ -28,15 +28,13 @@ class Bus(object): def get(self): """ Reads one message from the bus """ - return self.bus.get() + try: + return self.bus.get(timeout=0.1) + except Empty: + return def stop(self): - """ Stops the bus by sending a STOP event """ - evt = StopEvent(target=Config.get('device_id'), - origin=Config.get('device_id'), - thread_id=self.thread_id) - - self.post(evt) + self._should_stop.set() def _msg_executor(self, msg): def event_handler(event: Event, handler: Callable[[Event], None]): @@ -62,18 +60,22 @@ class Bus(object): return executor + def should_stop(self): + return self._should_stop.is_set() + def poll(self): """ Reads messages from the bus until either stop event message or KeyboardInterrupt """ - if not self.on_message: logger.warning('No message handlers installed, cannot poll') return - stop = False - while not stop: + while not self.should_stop(): msg = self.get() + if msg is None: + continue + timestamp = msg.timestamp if hasattr(msg, 'timestamp') else msg.get('timestamp') if timestamp and time.time() - timestamp > self._MSG_EXPIRY_TIMEOUT: logger.debug('{} seconds old message on the bus expired, ignoring it: {}'. @@ -82,9 +84,7 @@ class Bus(object): threading.Thread(target=self._msg_executor(msg)).start() - if isinstance(msg, StopEvent) and msg.targets_me(): - logger.info('Received STOP event on the bus') - stop = True + logger.info('Bus service stoppped') def register_handler(self, event_type: Type[Event], handler: Callable[[Event], None]) -> Callable[[], None]: """ diff --git a/platypush/bus/redis.py b/platypush/bus/redis.py index b4fd74773e..302b318226 100644 --- a/platypush/bus/redis.py +++ b/platypush/bus/redis.py @@ -32,9 +32,11 @@ class RedisBus(Bus): def get(self): """ Reads one message from the Redis queue """ msg = None - try: - msg = self.redis.blpop(self.redis_queue) + if self.should_stop(): + return + + msg = self.redis.blpop(self.redis_queue, timeout=1) if not msg or msg[1] is None: return @@ -54,5 +56,9 @@ class RedisBus(Bus): """ Sends a message to the Redis queue """ return self.redis.rpush(self.redis_queue, str(msg)) + def stop(self): + super().stop() + self.redis.close() + # vim:sw=4:ts=4:et: diff --git a/platypush/cron/scheduler.py b/platypush/cron/scheduler.py index b374eac0c1..c3a49b3903 100644 --- a/platypush/cron/scheduler.py +++ b/platypush/cron/scheduler.py @@ -1,11 +1,10 @@ import enum import logging +import threading import time import croniter -from threading import Thread - from platypush.procedure import Procedure from platypush.utils import is_functional_cron @@ -20,12 +19,13 @@ class CronjobState(enum.IntEnum): ERROR = 4 -class Cronjob(Thread): +class Cronjob(threading.Thread): def __init__(self, name, cron_expression, actions): super().__init__() self.cron_expression = cron_expression self.name = name self.state = CronjobState.IDLE + self._should_stop = threading.Event() if isinstance(actions, dict) or isinstance(actions, list): self.actions = Procedure.build(name=name + '__Cron', _async=False, requests=actions) @@ -35,6 +35,9 @@ class Cronjob(Thread): def run(self): self.state = CronjobState.WAIT self.wait() + if self.should_stop(): + return + self.state = CronjobState.RUNNING try: @@ -56,7 +59,7 @@ class Cronjob(Thread): now = int(time.time()) cron = croniter.croniter(self.cron_expression, now) next_run = int(cron.get_next()) - time.sleep(next_run - now) + self._should_stop.wait(next_run - now) def should_run(self): now = int(time.time()) @@ -64,12 +67,19 @@ class Cronjob(Thread): next_run = int(cron.get_next()) return now == next_run + def stop(self): + self._should_stop.set() -class CronScheduler(Thread): + def should_stop(self): + return self._should_stop.is_set() + + +class CronScheduler(threading.Thread): def __init__(self, jobs): super().__init__() self.jobs_config = jobs self._jobs = {} + self._should_stop = threading.Event() logger.info('Cron scheduler initialized with {} jobs'. format(len(self.jobs_config.keys()))) @@ -90,10 +100,18 @@ class CronScheduler(Thread): return self._jobs[name] + def stop(self): + for job in self._jobs.values(): + job.stop() + self._should_stop.set() + + def should_stop(self): + return self._should_stop.is_set() + def run(self): logger.info('Running cron scheduler') - while True: + while not self.should_stop(): for (job_name, job_config) in self.jobs_config.items(): job = self._get_job(name=job_name, config=job_config) if job.state == CronjobState.IDLE: @@ -101,5 +119,7 @@ class CronScheduler(Thread): time.sleep(0.5) + logger.info('Terminating cron scheduler') + # vim:sw=4:ts=4:et: diff --git a/platypush/message/event/__init__.py b/platypush/message/event/__init__.py index 968db68f14..3a0f83c88f 100644 --- a/platypush/message/event/__init__.py +++ b/platypush/message/event/__init__.py @@ -2,7 +2,6 @@ import copy import json import random import re -import threading import time from datetime import date @@ -199,29 +198,6 @@ class EventMatchResult(object): self.parsed_args = {} if not parsed_args else parsed_args -# XXX Should be a stop Request, not an Event -class StopEvent(Event): - """ StopEvent message. When received on a Bus, it will terminate the - listening thread having the specified ID. Useful to keep listeners in - sync and make them quit when the application terminates """ - - def __init__(self, target, origin, thread_id, id=None, **kwargs): - """ Constructor. - :param target: Target node - :param origin: Origin node - :param thread_id: thread_iden() to be terminated if listening on the bus - :param id: Event ID (default: auto-generated) - :param kwargs: Extra key-value arguments - """ - - super().__init__(target=target, origin=origin, id=id, - thread_id=thread_id, **kwargs) - - def targets_me(self): - """ Returns true if the stop event is for the current thread """ - return self.args['thread_id'] == threading.get_ident() - - def flatten(args): if isinstance(args, dict): for (key, value) in args.items():