From 8e2d4d0bce8ec9774aac15b6d700b33239fb2e11 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 25 Jul 2021 11:33:48 +0200 Subject: [PATCH] Make sure that the accept() in backend.tcp does not block the process --- platypush/backend/tcp.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/platypush/backend/tcp.py b/platypush/backend/tcp.py index 996626cb4..0224c6a33 100644 --- a/platypush/backend/tcp.py +++ b/platypush/backend/tcp.py @@ -1,5 +1,8 @@ +import multiprocessing +import queue import socket import threading +from typing import Optional from platypush.backend import Backend from platypush.message import Message @@ -31,6 +34,8 @@ class TcpBackend(Backend): self.port = port self.bind_address = bind_address or '0.0.0.0' self.listen_queue = listen_queue + self._accept_queue = multiprocessing.Queue() + self._srv: Optional[multiprocessing.Process] = None def _process_client(self, sock, address): def _f(): @@ -89,6 +94,14 @@ class TcpBackend(Backend): threading.Thread(target=_f_wrapper, name='TCPListener').start() + def _accept_process(self, serv_sock: socket.socket): + while not self.should_stop(): + try: + (sock, address) = serv_sock.accept() + self._accept_queue.put((sock, address)) + except socket.timeout: + continue + def run(self): super().run() self.register_service(port=self.port) @@ -102,16 +115,23 @@ class TcpBackend(Backend): format(self.port, self.bind_address)) serv_sock.listen(self.listen_queue) + self._srv = multiprocessing.Process(target=self._accept_process, args=(serv_sock,)) + self._srv.start() while not self.should_stop(): try: - (sock, address) = serv_sock.accept() - except socket.timeout: + sock, address = self._accept_queue.get(timeout=1) + except (socket.timeout, queue.Empty): continue self.logger.info('Accepted connection from client {}'.format(address[0])) self._process_client(sock, address) + if self._srv: + self._srv.kill() + self._srv.join() + self._srv = None + self.logger.info('TCP backend terminated')