Implemented TCP backend

This commit is contained in:
Fabio Manganiello 2018-10-11 14:02:26 +00:00
parent 15b65c4896
commit 74d23262b7
3 changed files with 140 additions and 21 deletions

View file

@ -18,6 +18,7 @@ from platypush.message import Message
from platypush.message.event import Event, StopEvent from platypush.message.event import Event, StopEvent
from platypush.message.request import Request from platypush.message.request import Request
from platypush.message.response import Response from platypush.message.response import Response
from platypush.utils import get_redis_queue_name_by_message
class Backend(Thread): class Backend(Thread):
@ -232,6 +233,33 @@ class Backend(Thread):
def should_stop(self): def should_stop(self):
return self._stop return self._stop
def _get_redis(self):
import redis
redis_backend = get_backend('redis')
if not redis_backend:
self.logger.warning('Redis backend not configured - some ' +
'web server features may not be working properly')
redis_args = {}
else:
redis_args = redis_backend.redis_args
redis = redis.Redis(**redis_args)
return redis
def get_message_response(self, msg):
try:
redis = self._get_redis()
response = redis.blpop(get_redis_queue_name_by_message(msg), timeout=60)
if response and response[1]:
response = Message.build(response[1])
else:
response = None
return response
except Exception as e:
self.logger.error('Error while processing response to {}: {}'.format(msg, str(e)))
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -20,7 +20,6 @@ from platypush.message import Message
from platypush.message.event import Event, StopEvent from platypush.message.event import Event, StopEvent
from platypush.message.event.web.widget import WidgetUpdateEvent from platypush.message.event.web.widget import WidgetUpdateEvent
from platypush.message.request import Request from platypush.message.request import Request
from platypush.utils import get_redis_queue_name_by_message
from .. import Backend from .. import Backend
@ -117,19 +116,6 @@ class HttpBackend(Backend):
self.active_websockets = set() self.active_websockets = set()
def _get_redis(self):
redis_backend = get_backend('redis')
if not redis_backend:
self.logger.warning('Redis backend not configured - some ' +
'web server features may not be working properly')
redis_args = {}
else:
redis_args = redis_backend.redis_args
redis = Redis(**redis_args)
return redis
def send_message(self, msg): def send_message(self, msg):
self.logger.warning('Use cURL or any HTTP client to query the HTTP backend') self.logger.warning('Use cURL or any HTTP client to query the HTTP backend')
@ -237,16 +223,10 @@ class HttpBackend(Backend):
msg.backend = self msg.backend = self
msg.origin = 'http' msg.origin = 'http'
redis = self._get_redis()
self.bus.post(msg) self.bus.post(msg)
if isinstance(msg, Request): if isinstance(msg, Request):
response = redis.blpop(get_redis_queue_name_by_message(msg), timeout=60) response = self.get_message_response(msg)
if response and response[1]:
response = Message.build(json.loads(response[1].decode('utf-8')))
else:
response = None
self.logger.info('Processing response on the HTTP backend: {}'.format(response)) self.logger.info('Processing response on the HTTP backend: {}'.format(response))
if response: if response:
return str(response) return str(response)

111
platypush/backend/tcp.py Normal file
View file

@ -0,0 +1,111 @@
import redis
import socket
import threading
from platypush.backend import Backend
from platypush.message import Message
from platypush.message.request import Request
class TcpBackend(Backend):
"""
Backend that reads messages from a configured TCP port
"""
# Maximum length of a request to be processed
_MAX_REQ_SIZE = 2048
def __init__(self, port, bind_address=None, listen_queue=5, *args, **kwargs):
"""
:param port: TCP port number
:type port: int
:param bind_address: Specify a bind address if you want to hook the service to a specific interface (default: listen for any connections)
:type bind_address: str
:param listen_queue: Maximum number of queued connections (default: 5)
:type listen_queue: int
"""
super().__init__(*args, **kwargs)
self.port = port
self.bind_address = bind_address or '0.0.0.0'
self.listen_queue = listen_queue
def _process_client(self, sock, address):
def _f():
processed_bytes = 0
open_brackets = 0
msg = b''
prev_ch = None
redis = self._get_redis()
while True:
if processed_bytes > self._MAX_REQ_SIZE:
self.logger.warning('Ignoring message longer than {} bytes from {}'
.format(self._MAX_REQ_SIZE, address[0]))
return
ch = sock.recv(1)
processed_bytes += 1
if ch == b'':
break
if ch == b'{' and prev_ch != b'\\':
open_brackets += 1
if not open_brackets:
continue
msg += ch
if ch == b'}' and prev_ch != b'\\':
open_brackets -= 1
if not open_brackets:
break
prev_ch = ch
if msg == b'':
return
msg = Message.build(msg)
self.logger.info('Received request from {}: {}'.format(msg, address[0]))
self.on_message(msg)
response = self.get_message_response(msg)
self.logger.info('Processing response on the TCP backend: {}'.format(response))
if response:
sock.send(str(response).encode())
def _f_wrapper():
try:
_f()
finally:
sock.close()
threading.Thread(target=_f_wrapper).run()
def run(self):
super().run()
serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv_sock.bind((self.bind_address, self.port))
self.logger.info('Initialized TCP backend on port {} with bind address {}'.
format(self.port, self.bind_address))
serv_sock.listen(self.listen_queue)
while not self.should_stop():
(sock, address) = serv_sock.accept()
self.logger.info('Accepted connection from client {}'.format(address[0]))
self._process_client(sock, address)
# vim:sw=4:ts=4:et: