Make sure that the Redis thread gets the stop event propagated when the application terminates

This commit is contained in:
Fabio Manganiello 2018-06-26 00:57:26 +02:00
parent 8e16f31603
commit e6fdcaa068

View file

@ -7,14 +7,14 @@ import os
import re import re
import time import time
from threading import Thread from threading import Thread, get_ident
from multiprocessing import Process from multiprocessing import Process
from flask import Flask, abort, jsonify, request as http_request, render_template, send_from_directory from flask import Flask, abort, jsonify, request as http_request, render_template, send_from_directory
from redis import Redis from redis import Redis
from platypush.config import Config from platypush.config import Config
from platypush.message import Message from platypush.message import Message
from platypush.message.event import Event from platypush.message.event import Event, StopEvent
from platypush.message.event.web.widget import WidgetUpdateEvent from platypush.message.event.web.widget import WidgetUpdateEvent
from platypush.message.request import Request from platypush.message.request import Request
@ -107,6 +107,7 @@ class HttpBackend(Backend):
self.server_proc = None self.server_proc = None
self.disable_websocket = disable_websocket self.disable_websocket = disable_websocket
self.websocket_thread = None self.websocket_thread = None
self.redis_thread = None
self.active_websockets = set() self.active_websockets = set()
self.redis = Redis() self.redis = Redis()
@ -119,11 +120,16 @@ class HttpBackend(Backend):
""" Stop the web server """ """ Stop the web server """
self.logger.info('Received STOP event on HttpBackend') self.logger.info('Received STOP event on HttpBackend')
if self.redis_thread:
stop_evt = StopEvent(target=self.device_id, origin=self.device_id,
thread_id=self.redis_thread.ident)
self.redis.rpush(self.redis_queue, stop_evt)
if self.server_proc: if self.server_proc:
self.server_proc.terminate() self.server_proc.terminate()
self.server_proc.join() self.server_proc.join()
def notify_web_clients(self, event): def notify_web_clients(self, event):
""" Notify all the connected web clients (over websocket) of a new event """ """ Notify all the connected web clients (over websocket) of a new event """
import websockets import websockets
@ -142,9 +148,15 @@ class HttpBackend(Backend):
def redis_poll(self): def redis_poll(self):
""" Polls for new messages on the internal Redis queue """ """ Polls for new messages on the internal Redis queue """
while not self.should_stop():
while True:
msg = self.redis.blpop(self.redis_queue) msg = self.redis.blpop(self.redis_queue)
msg = Message.build(json.loads(msg[1].decode('utf-8'))) msg = Message.build(json.loads(msg[1].decode('utf-8')))
if isinstance(msg, StopEvent) and \
msg.args.get('thread_id') == get_ident():
break
self.bus.post(msg) self.bus.post(msg)
@ -154,7 +166,8 @@ class HttpBackend(Backend):
template_dir = os.path.join(basedir, 'templates') template_dir = os.path.join(basedir, 'templates')
static_dir = os.path.join(basedir, 'static') static_dir = os.path.join(basedir, 'static')
app = Flask(__name__, template_folder=template_dir) app = Flask(__name__, template_folder=template_dir)
Thread(target=self.redis_poll).start() self.redis_thread = Thread(target=self.redis_poll)
self.redis_thread.start()
@app.route('/execute', methods=['POST']) @app.route('/execute', methods=['POST'])
def execute(): def execute():