diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index 3e03ba6b41..7efb6572b8 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -1,10 +1,12 @@ +import multiprocessing import os import pathlib import secrets import threading +import time from multiprocessing import Process -from typing import Mapping, Optional +from typing import List, Mapping, Optional from tornado.wsgi import WSGIContainer from tornado.web import Application, FallbackHandler @@ -186,6 +188,7 @@ class HttpBackend(Backend): """ _DEFAULT_HTTP_PORT = 8008 + """The default listen port for the webserver.""" def __init__( self, @@ -193,6 +196,7 @@ class HttpBackend(Backend): bind_address: str = '0.0.0.0', resource_dirs: Optional[Mapping[str, str]] = None, secret_key_file: Optional[str] = None, + num_workers: Optional[int] = None, **kwargs, ): """ @@ -204,12 +208,13 @@ class HttpBackend(Backend): the value is the absolute path to expose. :param secret_key_file: Path to the file containing the secret key that will be used by Flask (default: ``~/.local/share/platypush/flask.secret.key``). + :param num_workers: Number of worker processes to use (default: ``(cpu_count * 2) + 1``). """ super().__init__(**kwargs) self.port = port - self.server_proc = None + self._workers: List[Process] = [] self._service_registry_thread = None self.bind_address = bind_address self._io_loop: Optional[IOLoop] = None @@ -227,6 +232,7 @@ class HttpBackend(Backend): or os.path.join(Config.get('workdir'), 'flask.secret.key') # type: ignore ) self.local_base_url = f'http://localhost:{self.port}' + self.num_workers = num_workers or (multiprocessing.cpu_count() * 2) + 1 def send_message(self, *_, **__): self.logger.warning('Use cURL or any HTTP client to query the HTTP backend') @@ -240,18 +246,27 @@ class HttpBackend(Backend): self._io_loop.stop() self._io_loop.close() - if self.server_proc: - self.server_proc.terminate() - self.server_proc.join(timeout=10) - if self.server_proc.is_alive(): - self.server_proc.kill() - if self.server_proc.is_alive(): + workers = self._workers.copy() + for worker in workers: + worker.terminate() + + stop_timeout = 5 + wait_start_time = time.time() + wait_max_time = wait_start_time + stop_timeout + + for i, worker in enumerate(workers[::-1]): + worker.join(timeout=max(0, wait_max_time - time.time())) + if worker.is_alive(): + worker.kill() + + if worker.is_alive(): self.logger.info( - 'HTTP server process may be still alive at termination' + 'HTTP worker %s may be still alive at termination', worker.name ) else: - self.logger.info('HTTP server process terminated') + self._workers.pop(i) + self.logger.info('HTTP workers terminated') if self._service_registry_thread and self._service_registry_thread.is_alive(): self._service_registry_thread.join(timeout=5) self._service_registry_thread = None @@ -282,33 +297,28 @@ class HttpBackend(Backend): raise e def _web_server_proc(self): - def proc(): - self.logger.info('Starting local web server on port %s', self.port) - assert isinstance( - self.bus, RedisBus - ), 'The HTTP backend only works if backed by a Redis bus' + assert isinstance( + self.bus, RedisBus + ), 'The HTTP backend only works if backed by a Redis bus' - application.config['redis_queue'] = self.bus.redis_queue - application.secret_key = self._get_secret_key() + application.config['redis_queue'] = self.bus.redis_queue + application.secret_key = self._get_secret_key() + container = WSGIContainer(application) + server = Application( + [ + *[(route.path(), route) for route in get_ws_routes()], + (r'.*', FallbackHandler, {'fallback': container}), + ] + ) - container = WSGIContainer(application) - server = Application( - [ - *[(route.path(), route) for route in get_ws_routes()], - (r'.*', FallbackHandler, {'fallback': container}), - ] - ) + server.listen(address=self.bind_address, port=self.port, reuse_port=True) + self._io_loop = IOLoop.instance() - server.listen(address=self.bind_address, port=self.port) - self._io_loop = IOLoop.instance() - - try: - self._io_loop.start() - except Exception as e: - if not self.should_stop(): - raise e - - return proc + try: + self._io_loop.start() + except Exception as e: + if not self.should_stop(): + raise e def _register_service(self): try: @@ -325,9 +335,22 @@ class HttpBackend(Backend): self._service_registry_thread.start() def _run_web_server(self): - self.server_proc = Process(target=self._web_server_proc(), name='WebServer') - self.server_proc.start() - self.server_proc.join() + self.logger.info( + 'Starting local web server on port %s with %d service workers', + self.port, + self.num_workers, + ) + + workers = [ + Process(target=self._web_server_proc, name=f'WebWorker#{i + 1}') + for i in range(self.num_workers) + ] + + for worker in workers: + worker.start() + + for worker in workers: + worker.join() def run(self): super().run()