More consistent flow for messages received by a backend
This commit is contained in:
parent
59d84c4fcb
commit
d215410a6a
3 changed files with 8 additions and 15 deletions
|
@ -114,7 +114,6 @@ class Daemon:
|
|||
|
||||
def stop_app(self):
|
||||
""" Stops the backends and the bus """
|
||||
self.bus.post(ApplicationStoppedEvent())
|
||||
for backend in self.backends.values():
|
||||
backend.stop()
|
||||
self.bus.stop()
|
||||
|
@ -143,6 +142,7 @@ class Daemon:
|
|||
except KeyboardInterrupt:
|
||||
LOGGER.info('SIGINT received, terminating application')
|
||||
finally:
|
||||
self.bus.post(ApplicationStoppedEvent())
|
||||
self.stop_app()
|
||||
|
||||
|
||||
|
|
|
@ -169,19 +169,14 @@ class HttpBackend(Backend):
|
|||
def redis_poll(self):
|
||||
""" Polls for new messages on the internal Redis queue """
|
||||
|
||||
while True:
|
||||
while not self.should_stop():
|
||||
redis = self._get_redis()
|
||||
if not redis:
|
||||
continue
|
||||
|
||||
msg = redis.blpop(self.redis_queue)
|
||||
msg = Message.build(json.loads(msg[1].decode('utf-8')))
|
||||
|
||||
if isinstance(msg, StopEvent) and \
|
||||
msg.args.get('thread_id') == get_ident():
|
||||
break
|
||||
|
||||
self.bus.post(msg)
|
||||
self.on_message(msg)
|
||||
|
||||
|
||||
@classmethod
|
||||
|
@ -243,7 +238,7 @@ class HttpBackend(Backend):
|
|||
msg.origin = 'http'
|
||||
|
||||
redis = self._get_redis()
|
||||
self.bus.post(msg)
|
||||
self.on_message(msg)
|
||||
|
||||
if isinstance(msg, Request):
|
||||
response = redis.blpop(get_redis_queue_name_by_message(msg), timeout=60)
|
||||
|
@ -402,16 +397,14 @@ class HttpBackend(Backend):
|
|||
def run(self):
|
||||
super().run()
|
||||
os.putenv('FLASK_APP', 'platypush')
|
||||
os.putenv('FLASK_ENV', 'development')
|
||||
os.putenv('FLASK_ENV', 'production')
|
||||
self.logger.info('Initialized HTTP backend on port {}'.format(self.port))
|
||||
|
||||
webserver = self.webserver()
|
||||
self.server_proc = Process(target=webserver.run, kwargs={
|
||||
'debug':True, 'host':'0.0.0.0', 'port':self.port, 'use_reloader':False
|
||||
'host':'0.0.0.0', 'port':self.port, 'use_reloader':False
|
||||
})
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
self.server_proc.start()
|
||||
|
||||
if not self.disable_websocket:
|
||||
|
|
|
@ -28,12 +28,12 @@ class TestHttp(unittest.TestCase):
|
|||
|
||||
def test_request_exec_flow(self):
|
||||
self.start_daemon()
|
||||
time.sleep(2)
|
||||
time.sleep(1)
|
||||
self.send_request()
|
||||
|
||||
def start_daemon(self):
|
||||
def _f():
|
||||
self.receiver = Daemon(config_file=config_file, requests_to_process=1)
|
||||
self.receiver = Daemon(config_file=config_file)
|
||||
self.receiver.start()
|
||||
|
||||
Thread(target=_f).start()
|
||||
|
|
Loading…
Reference in a new issue