diff --git a/platypush/backend/tcp.py b/platypush/backend/tcp.py index 996626cb4..0224c6a33 100644 --- a/platypush/backend/tcp.py +++ b/platypush/backend/tcp.py @@ -1,5 +1,8 @@ +import multiprocessing +import queue import socket import threading +from typing import Optional from platypush.backend import Backend from platypush.message import Message @@ -31,6 +34,8 @@ class TcpBackend(Backend): self.port = port self.bind_address = bind_address or '0.0.0.0' self.listen_queue = listen_queue + self._accept_queue = multiprocessing.Queue() + self._srv: Optional[multiprocessing.Process] = None def _process_client(self, sock, address): def _f(): @@ -89,6 +94,14 @@ class TcpBackend(Backend): threading.Thread(target=_f_wrapper, name='TCPListener').start() + def _accept_process(self, serv_sock: socket.socket): + while not self.should_stop(): + try: + (sock, address) = serv_sock.accept() + self._accept_queue.put((sock, address)) + except socket.timeout: + continue + def run(self): super().run() self.register_service(port=self.port) @@ -102,16 +115,23 @@ class TcpBackend(Backend): format(self.port, self.bind_address)) serv_sock.listen(self.listen_queue) + self._srv = multiprocessing.Process(target=self._accept_process, args=(serv_sock,)) + self._srv.start() while not self.should_stop(): try: - (sock, address) = serv_sock.accept() - except socket.timeout: + sock, address = self._accept_queue.get(timeout=1) + except (socket.timeout, queue.Empty): continue self.logger.info('Accepted connection from client {}'.format(address[0])) self._process_client(sock, address) + if self._srv: + self._srv.kill() + self._srv.join() + self._srv = None + self.logger.info('TCP backend terminated')