platypush/platypush/backend/tcp/__init__.py

138 lines
4.1 KiB
Python

import multiprocessing
import queue
import socket
import threading
from typing import Optional
from platypush.backend import Backend
from platypush.message import Message
from platypush.utils import set_thread_name
class TcpBackend(Backend):
"""
Backend that reads messages from a configured TCP port
"""
# Maximum length of a request to be processed
_MAX_REQ_SIZE = 2048
def __init__(self, port, bind_address=None, listen_queue=5, *args, **kwargs):
"""
:param port: TCP port number
:type port: int
:param bind_address: Specify a bind address if you want to hook the service to a specific interface (default: listen for any connections)
:type bind_address: str
:param listen_queue: Maximum number of queued connections (default: 5)
:type listen_queue: int
"""
super().__init__(*args, **kwargs)
self.port = port
self.bind_address = bind_address or '0.0.0.0'
self.listen_queue = listen_queue
self._accept_queue = multiprocessing.Queue()
self._srv: Optional[multiprocessing.Process] = None
def _process_client(self, sock, address):
def _f():
processed_bytes = 0
open_brackets = 0
msg = b''
prev_ch = None
while not self.should_stop():
if processed_bytes > self._MAX_REQ_SIZE:
self.logger.warning('Ignoring message longer than {} bytes from {}'
.format(self._MAX_REQ_SIZE, address[0]))
return
ch = sock.recv(1)
processed_bytes += 1
if ch == b'':
break
if ch == b'{' and prev_ch != b'\\':
open_brackets += 1
if not open_brackets:
continue
msg += ch
if ch == b'}' and prev_ch != b'\\':
open_brackets -= 1
if not open_brackets:
break
prev_ch = ch
if msg == b'':
return
msg = Message.build(msg)
self.logger.info('Received request from {}: {}'.format(msg, address[0]))
self.on_message(msg)
response = self.get_message_response(msg)
self.logger.info('Processing response on the TCP backend: {}'.format(response))
if response:
sock.send(str(response).encode())
def _f_wrapper():
set_thread_name('TCPListener')
try:
_f()
finally:
sock.close()
threading.Thread(target=_f_wrapper, name='TCPListener').start()
def _accept_process(self, serv_sock: socket.socket):
while not self.should_stop():
try:
(sock, address) = serv_sock.accept()
self._accept_queue.put((sock, address))
except socket.timeout:
continue
def run(self):
super().run()
self.register_service(port=self.port)
serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv_sock.bind((self.bind_address, self.port))
serv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serv_sock.settimeout(0.5)
self.logger.info('Initialized TCP backend on port {} with bind address {}'.
format(self.port, self.bind_address))
serv_sock.listen(self.listen_queue)
self._srv = multiprocessing.Process(target=self._accept_process, args=(serv_sock,))
self._srv.start()
while not self.should_stop():
try:
sock, address = self._accept_queue.get(timeout=1)
except (socket.timeout, queue.Empty):
continue
self.logger.info('Accepted connection from client {}'.format(address[0]))
self._process_client(sock, address)
if self._srv:
self._srv.kill()
self._srv.join()
self._srv = None
self.logger.info('TCP backend terminated')
# vim:sw=4:ts=4:et: