From e6fdcaa0680eb87ce49be341e374b3c42e2517ca Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 26 Jun 2018 00:57:26 +0200 Subject: [PATCH] Make sure that the Redis thread gets the stop event propagated when the application terminates --- platypush/backend/http/__init__.py | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index a2dce9e3f..10a0e0ea3 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -7,14 +7,14 @@ import os import re import time -from threading import Thread +from threading import Thread, get_ident from multiprocessing import Process from flask import Flask, abort, jsonify, request as http_request, render_template, send_from_directory from redis import Redis from platypush.config import Config from platypush.message import Message -from platypush.message.event import Event +from platypush.message.event import Event, StopEvent from platypush.message.event.web.widget import WidgetUpdateEvent from platypush.message.request import Request @@ -107,6 +107,7 @@ class HttpBackend(Backend): self.server_proc = None self.disable_websocket = disable_websocket self.websocket_thread = None + self.redis_thread = None self.active_websockets = set() self.redis = Redis() @@ -119,11 +120,16 @@ class HttpBackend(Backend): """ Stop the web server """ self.logger.info('Received STOP event on HttpBackend') + if self.redis_thread: + stop_evt = StopEvent(target=self.device_id, origin=self.device_id, + thread_id=self.redis_thread.ident) + + self.redis.rpush(self.redis_queue, stop_evt) + if self.server_proc: self.server_proc.terminate() self.server_proc.join() - def notify_web_clients(self, event): """ Notify all the connected web clients (over websocket) of a new event """ import websockets @@ -142,9 +148,15 @@ class HttpBackend(Backend): def redis_poll(self): """ Polls for new messages on the internal Redis queue """ - while not self.should_stop(): + + while True: msg = self.redis.blpop(self.redis_queue) msg = Message.build(json.loads(msg[1].decode('utf-8'))) + + if isinstance(msg, StopEvent) and \ + msg.args.get('thread_id') == get_ident(): + break + self.bus.post(msg) @@ -154,7 +166,8 @@ class HttpBackend(Backend): template_dir = os.path.join(basedir, 'templates') static_dir = os.path.join(basedir, 'static') app = Flask(__name__, template_folder=template_dir) - Thread(target=self.redis_poll).start() + self.redis_thread = Thread(target=self.redis_poll) + self.redis_thread.start() @app.route('/execute', methods=['POST']) def execute():