From ac4fe4447ec4ef2153ba6150a94b44c1b21ddc74 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sat, 13 May 2023 02:36:20 +0200 Subject: [PATCH] Revert "Added a multi-worker approach to the Tornado WSGI container." This reverts commit 71401a4936f2b42b6af05e2916c3710df1c2dac2. Temporarily reverted this commit because the `reuse_address` on the application's `listen` method has only been implemented in Tornado 6.2 - and Debian stable still shipts Tornado 6.1. --- platypush/backend/http/__init__.py | 97 ++++++++++++------------------ 1 file changed, 37 insertions(+), 60 deletions(-) diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index 7efb6572b8..3e03ba6b41 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -1,12 +1,10 @@ -import multiprocessing import os import pathlib import secrets import threading -import time from multiprocessing import Process -from typing import List, Mapping, Optional +from typing import Mapping, Optional from tornado.wsgi import WSGIContainer from tornado.web import Application, FallbackHandler @@ -188,7 +186,6 @@ class HttpBackend(Backend): """ _DEFAULT_HTTP_PORT = 8008 - """The default listen port for the webserver.""" def __init__( self, @@ -196,7 +193,6 @@ class HttpBackend(Backend): bind_address: str = '0.0.0.0', resource_dirs: Optional[Mapping[str, str]] = None, secret_key_file: Optional[str] = None, - num_workers: Optional[int] = None, **kwargs, ): """ @@ -208,13 +204,12 @@ class HttpBackend(Backend): the value is the absolute path to expose. :param secret_key_file: Path to the file containing the secret key that will be used by Flask (default: ``~/.local/share/platypush/flask.secret.key``). - :param num_workers: Number of worker processes to use (default: ``(cpu_count * 2) + 1``). """ super().__init__(**kwargs) self.port = port - self._workers: List[Process] = [] + self.server_proc = None self._service_registry_thread = None self.bind_address = bind_address self._io_loop: Optional[IOLoop] = None @@ -232,7 +227,6 @@ class HttpBackend(Backend): or os.path.join(Config.get('workdir'), 'flask.secret.key') # type: ignore ) self.local_base_url = f'http://localhost:{self.port}' - self.num_workers = num_workers or (multiprocessing.cpu_count() * 2) + 1 def send_message(self, *_, **__): self.logger.warning('Use cURL or any HTTP client to query the HTTP backend') @@ -246,27 +240,18 @@ class HttpBackend(Backend): self._io_loop.stop() self._io_loop.close() - workers = self._workers.copy() - for worker in workers: - worker.terminate() - - stop_timeout = 5 - wait_start_time = time.time() - wait_max_time = wait_start_time + stop_timeout - - for i, worker in enumerate(workers[::-1]): - worker.join(timeout=max(0, wait_max_time - time.time())) - if worker.is_alive(): - worker.kill() - - if worker.is_alive(): + if self.server_proc: + self.server_proc.terminate() + self.server_proc.join(timeout=10) + if self.server_proc.is_alive(): + self.server_proc.kill() + if self.server_proc.is_alive(): self.logger.info( - 'HTTP worker %s may be still alive at termination', worker.name + 'HTTP server process may be still alive at termination' ) else: - self._workers.pop(i) + self.logger.info('HTTP server process terminated') - self.logger.info('HTTP workers terminated') if self._service_registry_thread and self._service_registry_thread.is_alive(): self._service_registry_thread.join(timeout=5) self._service_registry_thread = None @@ -297,28 +282,33 @@ class HttpBackend(Backend): raise e def _web_server_proc(self): - assert isinstance( - self.bus, RedisBus - ), 'The HTTP backend only works if backed by a Redis bus' + def proc(): + self.logger.info('Starting local web server on port %s', self.port) + assert isinstance( + self.bus, RedisBus + ), 'The HTTP backend only works if backed by a Redis bus' - application.config['redis_queue'] = self.bus.redis_queue - application.secret_key = self._get_secret_key() - container = WSGIContainer(application) - server = Application( - [ - *[(route.path(), route) for route in get_ws_routes()], - (r'.*', FallbackHandler, {'fallback': container}), - ] - ) + application.config['redis_queue'] = self.bus.redis_queue + application.secret_key = self._get_secret_key() - server.listen(address=self.bind_address, port=self.port, reuse_port=True) - self._io_loop = IOLoop.instance() + container = WSGIContainer(application) + server = Application( + [ + *[(route.path(), route) for route in get_ws_routes()], + (r'.*', FallbackHandler, {'fallback': container}), + ] + ) - try: - self._io_loop.start() - except Exception as e: - if not self.should_stop(): - raise e + server.listen(address=self.bind_address, port=self.port) + self._io_loop = IOLoop.instance() + + try: + self._io_loop.start() + except Exception as e: + if not self.should_stop(): + raise e + + return proc def _register_service(self): try: @@ -335,22 +325,9 @@ class HttpBackend(Backend): self._service_registry_thread.start() def _run_web_server(self): - self.logger.info( - 'Starting local web server on port %s with %d service workers', - self.port, - self.num_workers, - ) - - workers = [ - Process(target=self._web_server_proc, name=f'WebWorker#{i + 1}') - for i in range(self.num_workers) - ] - - for worker in workers: - worker.start() - - for worker in workers: - worker.join() + self.server_proc = Process(target=self._web_server_proc(), name='WebServer') + self.server_proc.start() + self.server_proc.join() def run(self): super().run()