Revert "Added a multi-worker approach to the Tornado WSGI container."

This reverts commit 71401a4936.

Temporarily reverted this commit because the `reuse_address` on the
application's `listen` method has only been implemented in Tornado 6.2 -
and Debian stable still shipts Tornado 6.1.
This commit is contained in:
Fabio Manganiello 2023-05-13 02:36:20 +02:00
parent 71401a4936
commit ac4fe4447e
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
1 changed files with 37 additions and 60 deletions

View File

@ -1,12 +1,10 @@
import multiprocessing
import os import os
import pathlib import pathlib
import secrets import secrets
import threading import threading
import time
from multiprocessing import Process from multiprocessing import Process
from typing import List, Mapping, Optional from typing import Mapping, Optional
from tornado.wsgi import WSGIContainer from tornado.wsgi import WSGIContainer
from tornado.web import Application, FallbackHandler from tornado.web import Application, FallbackHandler
@ -188,7 +186,6 @@ class HttpBackend(Backend):
""" """
_DEFAULT_HTTP_PORT = 8008 _DEFAULT_HTTP_PORT = 8008
"""The default listen port for the webserver."""
def __init__( def __init__(
self, self,
@ -196,7 +193,6 @@ class HttpBackend(Backend):
bind_address: str = '0.0.0.0', bind_address: str = '0.0.0.0',
resource_dirs: Optional[Mapping[str, str]] = None, resource_dirs: Optional[Mapping[str, str]] = None,
secret_key_file: Optional[str] = None, secret_key_file: Optional[str] = None,
num_workers: Optional[int] = None,
**kwargs, **kwargs,
): ):
""" """
@ -208,13 +204,12 @@ class HttpBackend(Backend):
the value is the absolute path to expose. the value is the absolute path to expose.
:param secret_key_file: Path to the file containing the secret key that will be used by Flask :param secret_key_file: Path to the file containing the secret key that will be used by Flask
(default: ``~/.local/share/platypush/flask.secret.key``). (default: ``~/.local/share/platypush/flask.secret.key``).
:param num_workers: Number of worker processes to use (default: ``(cpu_count * 2) + 1``).
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
self.port = port self.port = port
self._workers: List[Process] = [] self.server_proc = None
self._service_registry_thread = None self._service_registry_thread = None
self.bind_address = bind_address self.bind_address = bind_address
self._io_loop: Optional[IOLoop] = None self._io_loop: Optional[IOLoop] = None
@ -232,7 +227,6 @@ class HttpBackend(Backend):
or os.path.join(Config.get('workdir'), 'flask.secret.key') # type: ignore or os.path.join(Config.get('workdir'), 'flask.secret.key') # type: ignore
) )
self.local_base_url = f'http://localhost:{self.port}' self.local_base_url = f'http://localhost:{self.port}'
self.num_workers = num_workers or (multiprocessing.cpu_count() * 2) + 1
def send_message(self, *_, **__): def send_message(self, *_, **__):
self.logger.warning('Use cURL or any HTTP client to query the HTTP backend') self.logger.warning('Use cURL or any HTTP client to query the HTTP backend')
@ -246,27 +240,18 @@ class HttpBackend(Backend):
self._io_loop.stop() self._io_loop.stop()
self._io_loop.close() self._io_loop.close()
workers = self._workers.copy() if self.server_proc:
for worker in workers: self.server_proc.terminate()
worker.terminate() self.server_proc.join(timeout=10)
if self.server_proc.is_alive():
stop_timeout = 5 self.server_proc.kill()
wait_start_time = time.time() if self.server_proc.is_alive():
wait_max_time = wait_start_time + stop_timeout
for i, worker in enumerate(workers[::-1]):
worker.join(timeout=max(0, wait_max_time - time.time()))
if worker.is_alive():
worker.kill()
if worker.is_alive():
self.logger.info( self.logger.info(
'HTTP worker %s may be still alive at termination', worker.name 'HTTP server process may be still alive at termination'
) )
else: else:
self._workers.pop(i) self.logger.info('HTTP server process terminated')
self.logger.info('HTTP workers terminated')
if self._service_registry_thread and self._service_registry_thread.is_alive(): if self._service_registry_thread and self._service_registry_thread.is_alive():
self._service_registry_thread.join(timeout=5) self._service_registry_thread.join(timeout=5)
self._service_registry_thread = None self._service_registry_thread = None
@ -297,28 +282,33 @@ class HttpBackend(Backend):
raise e raise e
def _web_server_proc(self): def _web_server_proc(self):
assert isinstance( def proc():
self.bus, RedisBus self.logger.info('Starting local web server on port %s', self.port)
), 'The HTTP backend only works if backed by a Redis bus' assert isinstance(
self.bus, RedisBus
), 'The HTTP backend only works if backed by a Redis bus'
application.config['redis_queue'] = self.bus.redis_queue application.config['redis_queue'] = self.bus.redis_queue
application.secret_key = self._get_secret_key() application.secret_key = self._get_secret_key()
container = WSGIContainer(application)
server = Application(
[
*[(route.path(), route) for route in get_ws_routes()],
(r'.*', FallbackHandler, {'fallback': container}),
]
)
server.listen(address=self.bind_address, port=self.port, reuse_port=True) container = WSGIContainer(application)
self._io_loop = IOLoop.instance() server = Application(
[
*[(route.path(), route) for route in get_ws_routes()],
(r'.*', FallbackHandler, {'fallback': container}),
]
)
try: server.listen(address=self.bind_address, port=self.port)
self._io_loop.start() self._io_loop = IOLoop.instance()
except Exception as e:
if not self.should_stop(): try:
raise e self._io_loop.start()
except Exception as e:
if not self.should_stop():
raise e
return proc
def _register_service(self): def _register_service(self):
try: try:
@ -335,22 +325,9 @@ class HttpBackend(Backend):
self._service_registry_thread.start() self._service_registry_thread.start()
def _run_web_server(self): def _run_web_server(self):
self.logger.info( self.server_proc = Process(target=self._web_server_proc(), name='WebServer')
'Starting local web server on port %s with %d service workers', self.server_proc.start()
self.port, self.server_proc.join()
self.num_workers,
)
workers = [
Process(target=self._web_server_proc, name=f'WebWorker#{i + 1}')
for i in range(self.num_workers)
]
for worker in workers:
worker.start()
for worker in workers:
worker.join()
def run(self): def run(self):
super().run() super().run()