diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 9999d029..3aafa7ee 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -18,6 +18,7 @@ from platypush.message import Message from platypush.message.event import Event, StopEvent from platypush.message.request import Request from platypush.message.response import Response +from platypush.utils import get_redis_queue_name_by_message class Backend(Thread): @@ -232,6 +233,33 @@ class Backend(Thread): def should_stop(self): return self._stop + def _get_redis(self): + import redis + + redis_backend = get_backend('redis') + if not redis_backend: + self.logger.warning('Redis backend not configured - some ' + + 'web server features may not be working properly') + redis_args = {} + else: + redis_args = redis_backend.redis_args + + redis = redis.Redis(**redis_args) + return redis + + def get_message_response(self, msg): + try: + redis = self._get_redis() + response = redis.blpop(get_redis_queue_name_by_message(msg), timeout=60) + if response and response[1]: + response = Message.build(response[1]) + else: + response = None + + return response + except Exception as e: + self.logger.error('Error while processing response to {}: {}'.format(msg, str(e))) + # vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index 1e8bc31c..2f160184 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -20,7 +20,6 @@ from platypush.message import Message from platypush.message.event import Event, StopEvent from platypush.message.event.web.widget import WidgetUpdateEvent from platypush.message.request import Request -from platypush.utils import get_redis_queue_name_by_message from .. import Backend @@ -117,19 +116,6 @@ class HttpBackend(Backend): self.active_websockets = set() - def _get_redis(self): - redis_backend = get_backend('redis') - if not redis_backend: - self.logger.warning('Redis backend not configured - some ' + - 'web server features may not be working properly') - redis_args = {} - else: - redis_args = redis_backend.redis_args - - redis = Redis(**redis_args) - return redis - - def send_message(self, msg): self.logger.warning('Use cURL or any HTTP client to query the HTTP backend') @@ -237,16 +223,10 @@ class HttpBackend(Backend): msg.backend = self msg.origin = 'http' - redis = self._get_redis() self.bus.post(msg) if isinstance(msg, Request): - response = redis.blpop(get_redis_queue_name_by_message(msg), timeout=60) - if response and response[1]: - response = Message.build(json.loads(response[1].decode('utf-8'))) - else: - response = None - + response = self.get_message_response(msg) self.logger.info('Processing response on the HTTP backend: {}'.format(response)) if response: return str(response) diff --git a/platypush/backend/tcp.py b/platypush/backend/tcp.py new file mode 100644 index 00000000..37faf63e --- /dev/null +++ b/platypush/backend/tcp.py @@ -0,0 +1,111 @@ +import redis +import socket +import threading + +from platypush.backend import Backend +from platypush.message import Message +from platypush.message.request import Request + + +class TcpBackend(Backend): + """ + Backend that reads messages from a configured TCP port + """ + + # Maximum length of a request to be processed + _MAX_REQ_SIZE = 2048 + + def __init__(self, port, bind_address=None, listen_queue=5, *args, **kwargs): + """ + :param port: TCP port number + :type port: int + + :param bind_address: Specify a bind address if you want to hook the service to a specific interface (default: listen for any connections) + :type bind_address: str + + :param listen_queue: Maximum number of queued connections (default: 5) + :type listen_queue: int + """ + + super().__init__(*args, **kwargs) + + self.port = port + self.bind_address = bind_address or '0.0.0.0' + self.listen_queue = listen_queue + + def _process_client(self, sock, address): + def _f(): + processed_bytes = 0 + open_brackets = 0 + msg = b'' + prev_ch = None + redis = self._get_redis() + + while True: + if processed_bytes > self._MAX_REQ_SIZE: + self.logger.warning('Ignoring message longer than {} bytes from {}' + .format(self._MAX_REQ_SIZE, address[0])) + return + + ch = sock.recv(1) + processed_bytes += 1 + + if ch == b'': + break + + if ch == b'{' and prev_ch != b'\\': + open_brackets += 1 + + if not open_brackets: + continue + + msg += ch + + if ch == b'}' and prev_ch != b'\\': + open_brackets -= 1 + + if not open_brackets: + break + + prev_ch = ch + + if msg == b'': + return + + msg = Message.build(msg) + self.logger.info('Received request from {}: {}'.format(msg, address[0])) + self.on_message(msg) + + response = self.get_message_response(msg) + self.logger.info('Processing response on the TCP backend: {}'.format(response)) + + if response: + sock.send(str(response).encode()) + + def _f_wrapper(): + try: + _f() + finally: + sock.close() + + threading.Thread(target=_f_wrapper).run() + + def run(self): + super().run() + + serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + serv_sock.bind((self.bind_address, self.port)) + + self.logger.info('Initialized TCP backend on port {} with bind address {}'. + format(self.port, self.bind_address)) + + serv_sock.listen(self.listen_queue) + + while not self.should_stop(): + (sock, address) = serv_sock.accept() + self.logger.info('Accepted connection from client {}'.format(address[0])) + self._process_client(sock, address) + + +# vim:sw=4:ts=4:et: +