48 lines
1.4 KiB
Python
48 lines
1.4 KiB
Python
from threading import Thread, current_thread
|
|
from typing import Set
|
|
|
|
from platypush.backend.http.app.utils import send_message
|
|
from platypush.message.request import Request
|
|
|
|
from . import WSRoute, logger
|
|
|
|
|
|
class WSRequestsProxy(WSRoute):
|
|
"""
|
|
Websocket event proxy mapped to ``/ws/requests``.
|
|
"""
|
|
|
|
_max_concurrent_requests: int = 10
|
|
""" Maximum number of concurrent requests allowed on the same connection. """
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self._requests: Set[Thread] = set()
|
|
|
|
@classmethod
|
|
def app_name(cls) -> str:
|
|
return 'requests'
|
|
|
|
def _handle_request(self, request: Request):
|
|
self._requests.add(current_thread())
|
|
try:
|
|
response = send_message(request, wait_for_response=True)
|
|
self.send(str(response))
|
|
finally:
|
|
self._requests.remove(current_thread())
|
|
|
|
def on_message(self, message):
|
|
if len(self._requests) > self._max_concurrent_requests:
|
|
logger.info('Too many concurrent requests on %s', self)
|
|
return
|
|
|
|
try:
|
|
msg = Request.build(message)
|
|
assert isinstance(msg, Request), f'Expected {Request}, got {type(msg)}'
|
|
except Exception as e:
|
|
logger.info('Could not build request from %s: %s', message, e)
|
|
logger.exception(e)
|
|
return
|
|
|
|
Thread(target=self._handle_request, args=(msg,)).start()
|