Added a multi-worker approach to the Tornado WSGI container.

The WSGI container is a good option to wrap a multi-modal webapp
(Flask + websocket routes), but it's constrained to a single-process
approach and queued/pre-buffered requests. That makes performance poor
when handling requests that may take a few seconds to complete.
This commit is contained in:
Fabio Manganiello 2023-05-13 01:26:18 +02:00
parent b7b93edbae
commit 71401a4936
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
1 changed files with 60 additions and 37 deletions

View File

@ -1,10 +1,12 @@
import multiprocessing
import os
import pathlib
import secrets
import threading
import time
from multiprocessing import Process
from typing import Mapping, Optional
from typing import List, Mapping, Optional
from tornado.wsgi import WSGIContainer
from tornado.web import Application, FallbackHandler
@ -186,6 +188,7 @@ class HttpBackend(Backend):
"""
_DEFAULT_HTTP_PORT = 8008
"""The default listen port for the webserver."""
def __init__(
self,
@ -193,6 +196,7 @@ class HttpBackend(Backend):
bind_address: str = '0.0.0.0',
resource_dirs: Optional[Mapping[str, str]] = None,
secret_key_file: Optional[str] = None,
num_workers: Optional[int] = None,
**kwargs,
):
"""
@ -204,12 +208,13 @@ class HttpBackend(Backend):
the value is the absolute path to expose.
:param secret_key_file: Path to the file containing the secret key that will be used by Flask
(default: ``~/.local/share/platypush/flask.secret.key``).
:param num_workers: Number of worker processes to use (default: ``(cpu_count * 2) + 1``).
"""
super().__init__(**kwargs)
self.port = port
self.server_proc = None
self._workers: List[Process] = []
self._service_registry_thread = None
self.bind_address = bind_address
self._io_loop: Optional[IOLoop] = None
@ -227,6 +232,7 @@ class HttpBackend(Backend):
or os.path.join(Config.get('workdir'), 'flask.secret.key') # type: ignore
)
self.local_base_url = f'http://localhost:{self.port}'
self.num_workers = num_workers or (multiprocessing.cpu_count() * 2) + 1
def send_message(self, *_, **__):
self.logger.warning('Use cURL or any HTTP client to query the HTTP backend')
@ -240,18 +246,27 @@ class HttpBackend(Backend):
self._io_loop.stop()
self._io_loop.close()
if self.server_proc:
self.server_proc.terminate()
self.server_proc.join(timeout=10)
if self.server_proc.is_alive():
self.server_proc.kill()
if self.server_proc.is_alive():
workers = self._workers.copy()
for worker in workers:
worker.terminate()
stop_timeout = 5
wait_start_time = time.time()
wait_max_time = wait_start_time + stop_timeout
for i, worker in enumerate(workers[::-1]):
worker.join(timeout=max(0, wait_max_time - time.time()))
if worker.is_alive():
worker.kill()
if worker.is_alive():
self.logger.info(
'HTTP server process may be still alive at termination'
'HTTP worker %s may be still alive at termination', worker.name
)
else:
self.logger.info('HTTP server process terminated')
self._workers.pop(i)
self.logger.info('HTTP workers terminated')
if self._service_registry_thread and self._service_registry_thread.is_alive():
self._service_registry_thread.join(timeout=5)
self._service_registry_thread = None
@ -282,33 +297,28 @@ class HttpBackend(Backend):
raise e
def _web_server_proc(self):
def proc():
self.logger.info('Starting local web server on port %s', self.port)
assert isinstance(
self.bus, RedisBus
), 'The HTTP backend only works if backed by a Redis bus'
assert isinstance(
self.bus, RedisBus
), 'The HTTP backend only works if backed by a Redis bus'
application.config['redis_queue'] = self.bus.redis_queue
application.secret_key = self._get_secret_key()
application.config['redis_queue'] = self.bus.redis_queue
application.secret_key = self._get_secret_key()
container = WSGIContainer(application)
server = Application(
[
*[(route.path(), route) for route in get_ws_routes()],
(r'.*', FallbackHandler, {'fallback': container}),
]
)
container = WSGIContainer(application)
server = Application(
[
*[(route.path(), route) for route in get_ws_routes()],
(r'.*', FallbackHandler, {'fallback': container}),
]
)
server.listen(address=self.bind_address, port=self.port, reuse_port=True)
self._io_loop = IOLoop.instance()
server.listen(address=self.bind_address, port=self.port)
self._io_loop = IOLoop.instance()
try:
self._io_loop.start()
except Exception as e:
if not self.should_stop():
raise e
return proc
try:
self._io_loop.start()
except Exception as e:
if not self.should_stop():
raise e
def _register_service(self):
try:
@ -325,9 +335,22 @@ class HttpBackend(Backend):
self._service_registry_thread.start()
def _run_web_server(self):
self.server_proc = Process(target=self._web_server_proc(), name='WebServer')
self.server_proc.start()
self.server_proc.join()
self.logger.info(
'Starting local web server on port %s with %d service workers',
self.port,
self.num_workers,
)
workers = [
Process(target=self._web_server_proc, name=f'WebWorker#{i + 1}')
for i in range(self.num_workers)
]
for worker in workers:
worker.start()
for worker in workers:
worker.join()
def run(self):
super().run()