Changed the Tornado paradigm to start the WSGI workers.

Use `bind_sockets`/`fork_processes` instead of reinventing the wheel
with our own multiprocessing handling.
This commit is contained in:
Fabio Manganiello 2023-05-13 12:35:20 +02:00
parent ac4fe4447e
commit 72797e73ff
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
2 changed files with 79 additions and 55 deletions

View file

@ -1,14 +1,18 @@
import asyncio
import os import os
import pathlib import pathlib
import secrets import secrets
import threading import threading
from multiprocessing import Process from multiprocessing import Process
from typing import Mapping, Optional from time import time
from typing import List, Mapping, Optional
from tornado.httpserver import HTTPServer
from tornado.netutil import bind_sockets
from tornado.process import cpu_count, fork_processes
from tornado.wsgi import WSGIContainer from tornado.wsgi import WSGIContainer
from tornado.web import Application, FallbackHandler from tornado.web import Application, FallbackHandler
from tornado.ioloop import IOLoop
from platypush.backend import Backend from platypush.backend import Backend
from platypush.backend.http.app import application from platypush.backend.http.app import application
@ -186,6 +190,7 @@ class HttpBackend(Backend):
""" """
_DEFAULT_HTTP_PORT = 8008 _DEFAULT_HTTP_PORT = 8008
"""The default listen port for the webserver."""
def __init__( def __init__(
self, self,
@ -193,6 +198,7 @@ class HttpBackend(Backend):
bind_address: str = '0.0.0.0', bind_address: str = '0.0.0.0',
resource_dirs: Optional[Mapping[str, str]] = None, resource_dirs: Optional[Mapping[str, str]] = None,
secret_key_file: Optional[str] = None, secret_key_file: Optional[str] = None,
num_workers: Optional[int] = None,
**kwargs, **kwargs,
): ):
""" """
@ -204,15 +210,16 @@ class HttpBackend(Backend):
the value is the absolute path to expose. the value is the absolute path to expose.
:param secret_key_file: Path to the file containing the secret key that will be used by Flask :param secret_key_file: Path to the file containing the secret key that will be used by Flask
(default: ``~/.local/share/platypush/flask.secret.key``). (default: ``~/.local/share/platypush/flask.secret.key``).
:param num_workers: Number of worker processes to use (default: ``(cpu_count * 2) + 1``).
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
self.port = port self.port = port
self.server_proc = None self._server_proc: Optional[Process] = None
self._workers: List[Process] = []
self._service_registry_thread = None self._service_registry_thread = None
self.bind_address = bind_address self.bind_address = bind_address
self._io_loop: Optional[IOLoop] = None
if resource_dirs: if resource_dirs:
self.resource_dirs = { self.resource_dirs = {
@ -227,6 +234,7 @@ class HttpBackend(Backend):
or os.path.join(Config.get('workdir'), 'flask.secret.key') # type: ignore or os.path.join(Config.get('workdir'), 'flask.secret.key') # type: ignore
) )
self.local_base_url = f'http://localhost:{self.port}' self.local_base_url = f'http://localhost:{self.port}'
self.num_workers = num_workers or (cpu_count() * 2) + 1
def send_message(self, *_, **__): def send_message(self, *_, **__):
self.logger.warning('Use cURL or any HTTP client to query the HTTP backend') self.logger.warning('Use cURL or any HTTP client to query the HTTP backend')
@ -236,28 +244,34 @@ class HttpBackend(Backend):
super().on_stop() super().on_stop()
self.logger.info('Received STOP event on HttpBackend') self.logger.info('Received STOP event on HttpBackend')
if self._io_loop: start_time = time()
self._io_loop.stop() timeout = 5
self._io_loop.close() workers = self._workers.copy()
if self.server_proc: for i, worker in enumerate(workers[::-1]):
self.server_proc.terminate() if worker and worker.is_alive():
self.server_proc.join(timeout=10) worker.terminate()
if self.server_proc.is_alive(): worker.join(timeout=max(0, start_time + timeout - time()))
self.server_proc.kill()
if self.server_proc.is_alive(): if worker and worker.is_alive():
self.logger.info( worker.kill()
'HTTP server process may be still alive at termination' self._workers.pop(i)
)
else: if self._server_proc:
self.logger.info('HTTP server process terminated') self._server_proc.terminate()
self._server_proc.join(timeout=5)
self._server_proc = None
if self._server_proc and self._server_proc.is_alive():
self._server_proc.kill()
self._server_proc = None
self.logger.info('HTTP server terminated')
if self._service_registry_thread and self._service_registry_thread.is_alive(): if self._service_registry_thread and self._service_registry_thread.is_alive():
self._service_registry_thread.join(timeout=5) self._service_registry_thread.join(timeout=5)
self._service_registry_thread = None self._service_registry_thread = None
self._io_loop = None
def notify_web_clients(self, event): def notify_web_clients(self, event):
"""Notify all the connected web clients (over websocket) of a new event""" """Notify all the connected web clients (over websocket) of a new event"""
get_redis().publish(events_redis_topic, str(event)) get_redis().publish(events_redis_topic, str(event))
@ -281,35 +295,6 @@ class HttpBackend(Backend):
raise e raise e
def _web_server_proc(self):
def proc():
self.logger.info('Starting local web server on port %s', self.port)
assert isinstance(
self.bus, RedisBus
), 'The HTTP backend only works if backed by a Redis bus'
application.config['redis_queue'] = self.bus.redis_queue
application.secret_key = self._get_secret_key()
container = WSGIContainer(application)
server = Application(
[
*[(route.path(), route) for route in get_ws_routes()],
(r'.*', FallbackHandler, {'fallback': container}),
]
)
server.listen(address=self.bind_address, port=self.port)
self._io_loop = IOLoop.instance()
try:
self._io_loop.start()
except Exception as e:
if not self.should_stop():
raise e
return proc
def _register_service(self): def _register_service(self):
try: try:
self.register_service(port=self.port) self.register_service(port=self.port)
@ -324,16 +309,55 @@ class HttpBackend(Backend):
) )
self._service_registry_thread.start() self._service_registry_thread.start()
def _run_web_server(self): async def _post_fork_main(self, sockets):
self.server_proc = Process(target=self._web_server_proc(), name='WebServer') assert isinstance(
self.server_proc.start() self.bus, RedisBus
self.server_proc.join() ), 'The HTTP backend only works if backed by a Redis bus'
application.config['redis_queue'] = self.bus.redis_queue
application.secret_key = self._get_secret_key()
container = WSGIContainer(application)
tornado_app = Application(
[
*[(route.path(), route) for route in get_ws_routes()],
(r'.*', FallbackHandler, {'fallback': container}),
]
)
server = HTTPServer(tornado_app)
server.add_sockets(sockets)
try:
await asyncio.Event().wait()
except (asyncio.CancelledError, KeyboardInterrupt):
return
def _web_server_proc(self):
self.logger.info(
'Starting local web server on port %s with %d service workers',
self.port,
self.num_workers,
)
sockets = bind_sockets(self.port, address=self.bind_address, reuse_port=True)
try:
fork_processes(self.num_workers)
future = self._post_fork_main(sockets)
asyncio.run(future)
except (asyncio.CancelledError, KeyboardInterrupt):
return
def _start_web_server(self):
self._server_proc = Process(target=self._web_server_proc)
self._server_proc.start()
self._server_proc.join()
def run(self): def run(self):
super().run() super().run()
self._start_zeroconf_service() self._start_zeroconf_service()
self._run_web_server() self._start_web_server()
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -89,7 +89,7 @@ class WSRoute(WebSocketHandler, Thread, ABC):
continue continue
yield msg.get('data') yield msg.get('data')
except RedisConnectionError: except (AttributeError, RedisConnectionError):
return return
def send(self, msg: Union[str, bytes, dict, list, tuple, set]) -> None: def send(self, msg: Union[str, bytes, dict, list, tuple, set]) -> None: