Refactored HTTP server to split the routes on separate files and keep the main Flask app object in a separate file as well so it can be easily wrapped by a WSGI instance

This commit is contained in:
Fabio Manganiello 2019-02-23 21:19:00 +01:00
parent 73383ab80f
commit 0e794cd1b0
17 changed files with 961 additions and 701 deletions

View file

@ -255,7 +255,7 @@ class Backend(Thread):
try: try:
redis = self._get_redis() redis = self._get_redis()
response = redis.blpop(get_redis_queue_name_by_message(msg), timeout=60) response = redis.blpop(get_redis_queue_name_by_message(msg), timeout=60)
if response and response[1]: if response and len(response) > 1:
response = Message.build(response[1]) response = Message.build(response[1])
else: else:
response = None response = None

View file

@ -1,31 +1,12 @@
import asyncio
import datetime
import dateutil.parser
import hashlib
import inspect
import json
import os import os
import re
import threading import threading
import time
from multiprocessing import Process from multiprocessing import Process
from flask import Flask, Response, abort, jsonify, request as http_request, \
render_template, send_from_directory
from redis import Redis from platypush.backend import Backend
from platypush.backend.http.app import app
from platypush.config import Config from platypush.context import get_or_create_event_loop
from platypush.context import get_backend, get_plugin, get_or_create_event_loop from platypush.utils import get_ssl_server_context, set_thread_name
from platypush.message import Message
from platypush.message.event import Event, StopEvent
from platypush.message.event.web.widget import WidgetUpdateEvent
from platypush.message.request import Request
from platypush.utils import get_ssl_server_context, set_thread_name, \
get_ip_or_hostname
from .. import Backend
from .media.handlers import MediaHandler
class HttpBackend(Backend): class HttpBackend(Backend):
@ -47,6 +28,13 @@ class HttpBackend(Backend):
* To display a fullscreen dashboard with your configured widgets, by default available under ``/dashboard`` * To display a fullscreen dashboard with your configured widgets, by default available under ``/dashboard``
* To stream media over HTTP through the ``/media`` endpoint
Any plugin can register custom routes under ``platypush/backend/http/app/routes/plugins``.
Any additional route is managed as a Flask blueprint template and the `.py`
module can expose lists of routes to the main webapp through the
``__routes__`` object (a list of Flask blueprints).
Note that if you set up a main token, it will be required for any HTTP Note that if you set up a main token, it will be required for any HTTP
interaction - either as ``X-Token`` HTTP header, on the query string interaction - either as ``X-Token`` HTTP header, on the query string
(attribute name: ``token``), as part of the JSON payload root (attribute (attribute name: ``token``), as part of the JSON payload root (attribute
@ -62,21 +50,13 @@ class HttpBackend(Backend):
support if you want to enable media streaming support if you want to enable media streaming
""" """
hidden_plugins = { _DEFAULT_HTTP_PORT = 8008
'assistant.google' _DEFAULT_WEBSOCKET_PORT = 8009
}
# Default size for the bytes chunk sent over the media streaming infra def __init__(self, port=_DEFAULT_HTTP_PORT,
_DEFAULT_STREAMING_CHUNK_SIZE = 4096 websocket_port=_DEFAULT_WEBSOCKET_PORT,
disable_websocket=False, dashboard={}, resource_dirs={},
# Maximum range size to be sent through the media streamer if Range header
# is not set
_DEFAULT_STREAMING_BLOCK_SIZE = 3145728
def __init__(self, port=8008, websocket_port=8009, disable_websocket=False,
redis_queue='platypush/http', dashboard={}, resource_dirs={},
ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None, ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None,
streaming_chunk_size=_DEFAULT_STREAMING_CHUNK_SIZE,
maps={}, **kwargs): maps={}, **kwargs):
""" """
:param port: Listen port for the web server (default: 8008) :param port: Listen port for the web server (default: 8008)
@ -88,9 +68,6 @@ class HttpBackend(Backend):
:param disable_websocket: Disable the websocket interface (default: False) :param disable_websocket: Disable the websocket interface (default: False)
:type disable_websocket: bool :type disable_websocket: bool
:param redis_queue: Name of the Redis queue used to synchronize messages with the web server process (default: ``platypush/http``)
:type redis_queue: str
:param ssl_cert: Set it to the path of your certificate file if you want to enable HTTPS (default: None) :param ssl_cert: Set it to the path of your certificate file if you want to enable HTTPS (default: None)
:type ssl_cert: str :type ssl_cert: str
@ -138,25 +115,17 @@ class HttpBackend(Backend):
db: "sqlite:////home/blacklight/.local/share/platypush/feeds/rss.db" db: "sqlite:////home/blacklight/.local/share/platypush/feeds/rss.db"
:type dashboard: dict :type dashboard: dict
:param streaming_chunk_size: Size for the chunks of bytes sent over the
media streaming infrastructure (default: 4096 bytes)
:type streaming_chunk_size: int
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
self.port = port self.port = port
self.websocket_port = websocket_port self.websocket_port = websocket_port
self.app = None
self.redis_queue = redis_queue
self.dashboard = dashboard self.dashboard = dashboard
self.maps = maps self.maps = maps
self.server_proc = None self.server_proc = None
self.disable_websocket = disable_websocket self.disable_websocket = disable_websocket
self.websocket_thread = None self.websocket_thread = None
self.redis_thread = None
self.redis = None
self.resource_dirs = { name: os.path.abspath( self.resource_dirs = { name: os.path.abspath(
os.path.expanduser(d)) for name, d in resource_dirs.items() } os.path.expanduser(d)) for name, d in resource_dirs.items() }
self.active_websockets = set() self.active_websockets = set()
@ -166,32 +135,15 @@ class HttpBackend(Backend):
ssl_capath=ssl_capath) \ ssl_capath=ssl_capath) \
if ssl_cert else None if ssl_cert else None
self.remote_base_url = '{proto}://{host}:{port}'.format(
proto=('https' if self.ssl_context else 'http'),
host=get_ip_or_hostname(), port=self.port)
self.local_base_url = '{proto}://localhost:{port}'.format(
proto=('https' if self.ssl_context else 'http'), port=self.port)
self._media_map_lock = threading.RLock()
def send_message(self, msg): def send_message(self, msg):
self.logger.warning('Use cURL or any HTTP client to query the HTTP backend') self.logger.warning('Use cURL or any HTTP client to query the HTTP backend')
def stop(self): def on_stop(self):
""" Stop the web server """ """ On backend stop """
self.logger.info('Received STOP event on HttpBackend') self.logger.info('Received STOP event on HttpBackend')
if self.redis_thread:
stop_evt = StopEvent(target=self.device_id, origin=self.device_id,
thread_id=self.redis_thread.ident)
redis = self._get_redis()
if redis:
redis.rpush(self.redis_queue, str(stop_evt))
if self.server_proc: if self.server_proc:
self.server_proc.terminate() self.server_proc.terminate()
self.server_proc.join() self.server_proc.join()
@ -216,516 +168,6 @@ class HttpBackend(Backend):
self.active_websockets.remove(websocket) self.active_websockets.remove(websocket)
def redis_poll(self):
""" Polls for new messages on the internal Redis queue """
while not self.should_stop():
redis = self._get_redis()
if not redis:
continue
msg = redis.blpop(self.redis_queue)
msg = Message.build(json.loads(msg[1].decode('utf-8')))
self.on_message(msg)
@classmethod
def _authenticate(cls):
return Response('Authentication required', 401,
{'WWW-Authenticate': 'Basic realm="Login required"'})
@classmethod
def _authentication_ok(cls):
token = Config.get('token')
if not token:
return True
user_token = None
# Check if
if 'X-Token' in http_request.headers:
user_token = http_request.headers['X-Token']
elif http_request.authorization:
# TODO support for user check
user_token = http_request.authorization.password
elif 'token' in http_request.args:
user_token = http_request.args.get('token')
else:
try:
args = json.loads(http_request.data.decode('utf-8'))
user_token = args.get('token')
except:
pass
if user_token == token:
return True
return False
def webserver(self):
""" Web server main process """
set_thread_name('WebServer')
basedir = os.path.dirname(inspect.getfile(self.__class__))
template_dir = os.path.join(basedir, 'templates')
media_map = {}
app = Flask(__name__, template_folder=template_dir)
self.redis_thread = threading.Thread(target=self.redis_poll)
self.redis_thread.start()
@app.route('/execute', methods=['POST'])
def execute():
""" Endpoint to execute commands """
if not self._authentication_ok(): return self._authenticate()
args = json.loads(http_request.data.decode('utf-8'))
msg = Message.build(args)
self.logger.info('Received message on the HTTP backend: {}'.format(msg))
if Config.get('token'):
msg.token = Config.get('token')
if isinstance(msg, Request):
msg.backend = self
msg.origin = 'http'
self.bus.post(msg)
if isinstance(msg, Request):
response = self.get_message_response(msg)
self.logger.info('Processing response on the HTTP backend: {}'.format(response))
if response:
return Response(str(response), mimetype='application/json')
@app.route('/')
def index():
""" Route to the main web panel """
if not self._authentication_ok(): return self._authenticate()
configured_plugins = Config.get_plugins()
enabled_plugins = {}
hidden_plugins = {}
for plugin, conf in configured_plugins.items():
template_file = os.path.join('plugins', plugin + '.html')
if os.path.isfile(os.path.join(template_dir, template_file)):
if plugin in self.hidden_plugins:
hidden_plugins[plugin] = conf
else:
enabled_plugins[plugin] = conf
return render_template('index.html', plugins=enabled_plugins,
hidden_plugins=hidden_plugins, utils=HttpUtils,
token=Config.get('token'),
websocket_port=self.websocket_port,
has_ssl=self.ssl_context is not None)
@app.route('/widget/<widget>', methods=['POST'])
def widget_update(widget):
""" ``POST /widget/<widget_id>`` will update the specified widget_id on the dashboard with the specified key-values """
event = WidgetUpdateEvent(
widget=widget, **(json.loads(http_request.data.decode('utf-8'))))
redis = self._get_redis()
if redis:
redis.rpush(self.redis_queue, str(event))
return jsonify({ 'status': 'ok' })
@app.route('/resources/<path:path>', methods=['GET'])
def static_path(path):
""" Static resources """
base_path = os.path.dirname(path).split('/')
while base_path:
if os.sep.join(base_path) in self.resource_dirs:
break
base_path.pop()
if not base_path:
abort(404)
base_path = os.sep.join(base_path)
real_base_path = self.resource_dirs[base_path]
real_path = real_base_path
file_path = [s for s in
re.sub(r'^{}(.*)$'.format(base_path), '\\1', path) \
.split('/') if s]
for p in file_path[:-1]:
real_path += os.sep + p
file_path.pop(0)
file_path = file_path.pop(0)
if not real_path.startswith(real_base_path):
# Directory climbing attempt
abort(404)
return send_from_directory(real_path, file_path)
def get_media_url(media_id):
return '{url}/media/{media_id}'.format(
url=self.remote_base_url, media_id=media_id)
def get_media_id(source):
return hashlib.sha1(source.encode()).hexdigest()
def register_media(source, subtitles=None):
media_id = get_media_id(source)
media_url = get_media_url(media_id)
with self._media_map_lock:
if media_id in media_map:
return media_map[media_id]
subfile = None
if subtitles:
try:
subfile = get_plugin('media.subtitles').download(
link=subtitles, convert_to_vtt=True
).output.get('filename')
except Exception as e:
self.logger.warning('Unable to load subtitle {}: {}'
.format(subtitles, str(e)))
media_hndl = MediaHandler.build(source, url=media_url,
subtitles=subfile)
media_map[media_id] = media_hndl
media_hndl.media_id = media_id
self.logger.info('Streaming "{}" on {}'.format(source, media_url))
return media_hndl
def unregister_media(source):
if source is None:
raise KeyError('No media_id specified')
media_id = get_media_id(source)
media_info = {}
with self._media_map_lock:
if media_id not in media_map:
raise FileNotFoundError('{} is not a registered media_id'.
format(source))
media_info = media_map.pop(media_id)
self.logger.info('Unregistered {} from {}'.format(
source, media_info.get('url')))
return media_info
def stream_media(media_id, request):
media_hndl = media_map.get(media_id)
if not media_hndl:
raise FileNotFoundError('{} is not a registered media_id'.
format(media_id))
from_bytes = None
to_bytes = None
range_hdr = request.headers.get('range')
content_length = media_hndl.content_length
status_code = 200
headers = {
'Accept-Ranges': 'bytes',
'Content-Type': media_hndl.mime_type,
}
if 'download' in request.args:
headers['Content-Disposition'] = 'attachment' + \
('; filename="{}"'.format(media_hndl.filename) if
media_hndl.filename else '')
if range_hdr:
headers['Accept-Ranges'] = 'bytes'
from_bytes, to_bytes = range_hdr.replace('bytes=', '').split('-')
from_bytes = int(from_bytes)
if not to_bytes:
to_bytes = content_length-1
content_length -= from_bytes
else:
to_bytes = int(to_bytes)
content_length = to_bytes - from_bytes
status_code = 206
headers['Content-Range'] = 'bytes {start}-{end}/{size}'.format(
start=from_bytes, end=to_bytes,
size=media_hndl.content_length)
else:
from_bytes = 0
to_bytes = self._DEFAULT_STREAMING_BLOCK_SIZE
headers['Content-Length'] = content_length
if 'webplayer' in request.args:
return render_template('webplayer.html',
media_url=media_hndl.url.replace(
self.remote_base_url, ''),
media_type=media_hndl.mime_type,
subtitles_url='/media/subtitles/{}.vtt'.
format(media_id) if media_hndl.subtitles
else None)
else:
return Response(media_hndl.get_data(
from_bytes=from_bytes, to_bytes=to_bytes,
chunk_size=self._DEFAULT_STREAMING_CHUNK_SIZE),
status_code, headers=headers, mimetype=headers['Content-Type'],
direct_passthrough=True)
def add_subtitles(media_id, req):
"""
This route can be used to download and/or expose subtitles files
associated to a media file
"""
media_hndl = media_map.get(media_id)
if not media_hndl:
raise FileNotFoundError('{} is not a registered media_id'.
format(media_id))
subfile = None
if req.data:
try:
subfile = json.loads(req.data.decode('utf-8')) \
.get('filename')
if not subfile:
raise AttributeError
except Exception as e:
raise AttributeError(400, 'No filename in the request: {}'
.format(str(e)))
if not subfile:
if not media_hndl.path:
raise NotImplementedError(
'Subtitles are currently only supported for ' +
'local media files')
try:
subtitles = get_plugin('media.subtitles').get_subtitles(
media_hndl.path)
except Exception as e:
raise RuntimeError('Could not get subtitles: {}'.
format(str(e)))
if not subtitles:
raise FileNotFoundError(
'No subtitles found for resource {}'.format(
media_hndl.path))
subfile = get_plugin('media.subtitles').download(
link=subtitles[0].get('SubDownloadLink'),
media_resource=media_hndl.path,
convert_to_vtt=True).get('filename')
media_hndl.set_subtitles(subfile)
return {
'filename': subfile,
'url': self.remote_base_url + '/media/subtitles/' + media_id + '.vtt',
}
def remove_subtitles(media_id):
media_hndl = media_map.get(media_id)
if not media_hndl:
raise FileNotFoundError('{} is not a registered media_id'.
format(media_id))
if not media_hndl.subtitles:
raise FileNotFoundError('{} has no subtitles attached'.
format(media_id))
media_hndl.remove_subtitles()
return {}
@app.route('/media/subtitles/<media_id>.vtt', methods=['GET', 'POST', 'DELETE'])
def handle_subtitles(media_id):
"""
This route can be used to download and/or expose subtitle files
associated to a media file
"""
if http_request.method == 'GET':
media_hndl = media_map.get(media_id)
if not media_hndl:
abort(404, 'No such media')
if not media_hndl.subtitles:
abort(404, 'The media has no subtitles attached')
return send_from_directory(
os.path.dirname(media_hndl.subtitles),
os.path.basename(media_hndl.subtitles),
mimetype='text/vtt')
try:
if http_request.method == 'DELETE':
return jsonify(remove_subtitles(media_id))
else:
return jsonify(add_subtitles(media_id, http_request))
except FileNotFoundError as e:
abort(404, str(e))
except AttributeError as e:
abort(400, str(e))
except NotImplementedError as e:
abort(422, str(e))
except Exception as e:
abort(500, str(e))
@app.route('/media', methods=['GET', 'PUT'])
def add_or_get_media():
"""
This route can be used by the `media` plugin to add streaming
content over HTTP or to get the list of registered streams
"""
if http_request.method == 'GET':
return jsonify([dict(media) for media in media_map.values()])
args = {}
try:
args = json.loads(http_request.data.decode('utf-8'))
except:
abort(400, 'Invalid JSON request')
source = args.get('source')
if not source:
abort(400, 'The request does not contain any source')
subtitles = args.get('subtitles')
try:
media_hndl = register_media(source, subtitles)
ret = dict(media_hndl)
if media_hndl.subtitles:
ret['subtitles_url'] = self.remote_base_url + \
'/media/subtitles/' + media_hndl.media_id + '.vtt'
return jsonify(ret)
except FileNotFoundError as e:
abort(404, str(e))
except AttributeError as e:
abort(400, str(e))
except Exception as e:
self.logger.exception(e)
abort(500, str(e))
@app.route('/media/<media_id>', methods=['GET', 'DELETE'])
def stream_or_delete_media(media_id):
"""
This route can be used to stream active media points or unregister
a mounted media stream
"""
# Remove the extension
media_id = '.'.join(media_id.split('.')[:-1])
try:
if http_request.method == 'GET':
if media_id is None:
return jsonify(media_map)
else:
return stream_media(media_id, http_request)
else:
media_info = unregister_media(media_id)
return jsonify(media_info)
except (AttributeError, FileNotFoundError) as e:
abort(404, str(e))
except KeyError as e:
abort(400, str(e))
except Exception as e:
self.logger.exception(e)
abort(500, str(e))
@app.route('/dashboard', methods=['GET'])
def dashboard():
""" Route for the fullscreen dashboard """
if not self._authentication_ok(): return self._authenticate()
return render_template('dashboard.html', config=self.dashboard, utils=HttpUtils,
token=Config.get('token'), websocket_port=self.websocket_port)
@app.route('/map', methods=['GET'])
def map():
"""
Query parameters:
start -- Map timeline start timestamp
end -- Map timeline end timestamp
zoom -- Between 1-20. Set it if you want to override the
Google's API auto-zoom. You may have to set it if you are
trying to embed the map into an iframe
Supported values for `start` and `end`:
- now
- yesterday
- -30s (it means '30 seconds ago')
- -10m (it means '10 minutes ago')
- -24h (it means '24 hours ago')
- -7d (it means '7 days ago')
- 2018-06-04T17:39:22.742Z (ISO strings)
Default: start=yesterday, end=now
"""
def parse_time(time_string):
if not time_string:
return None
now = datetime.datetime.now()
if time_string == 'now':
return now.isoformat()
if time_string == 'yesterday':
return (now - datetime.timedelta(days=1)).isoformat()
try:
return dateutil.parser.parse(time_string).isoformat()
except ValueError:
pass
m = re.match('([-+]?)([0-9]+)([dhms])', time_string)
if not m:
raise RuntimeError('Invalid time interval string representation: "{}"'.
format(time_string))
time_delta = (-1 if m.group(1) == '-' else 1) * int(m.group(2))
time_unit = m.group(3)
if time_unit == 'd':
params = { 'days': time_delta }
elif time_unit == 'h':
params = { 'hours': time_delta }
elif time_unit == 'm':
params = { 'minutes': time_delta }
elif time_unit == 's':
params = { 'seconds': time_delta }
return (now + datetime.timedelta(**params)).isoformat()
if not self._authentication_ok(): return self._authenticate()
try:
api_key = self.maps['api_key']
except KeyError:
raise RuntimeError('Google Maps api_key not set in the maps configuration')
start = parse_time(http_request.args.get('start', default='yesterday'))
end = parse_time(http_request.args.get('end', default='now'))
zoom = http_request.args.get('zoom', default=None)
return render_template('map.html', config=self.maps,
utils=HttpUtils, start=start, end=end,
zoom=zoom, token=Config.get('token'), api_key=api_key,
websocket_port=self.websocket_port)
return app
def websocket(self): def websocket(self):
""" Websocket main server """ """ Websocket main server """
import websockets import websockets
@ -754,120 +196,36 @@ class HttpBackend(Backend):
**websocket_args)) **websocket_args))
loop.run_forever() loop.run_forever()
def _start_web_server(self):
def proc():
kwargs = {
'host': '0.0.0.0',
'port': self.port,
'use_reloader': False,
'debug': False,
}
if self.ssl_context:
kwargs['ssl_context'] = self.ssl_context
app.run(**kwargs)
return proc
def run(self): def run(self):
super().run() super().run()
os.putenv('FLASK_APP', 'platypush')
os.putenv('FLASK_ENV', 'production')
kwargs = {
'host':'0.0.0.0', 'port':self.port, 'use_reloader':False
}
if self.ssl_context: self.server_proc = Process(target=self._start_web_server(),
kwargs['ssl_context'] = self.ssl_context name='WebServer')
self.logger.info('Initialized HTTP backend on port {}'.format(self.port))
self.app = self.webserver()
self.app.debug = False
self.server_proc = Process(target=self.app.run,
name='WebServer',
kwargs=kwargs)
self.server_proc.start() self.server_proc.start()
if not self.disable_websocket: if not self.disable_websocket:
self.websocket_thread = threading.Thread(target=self.websocket) self.websocket_thread = threading.Thread(target=self.websocket)
self.websocket_thread.start() self.websocket_thread.start()
self.logger.info('Initialized HTTP backend on port {}'.format(self.port))
self.server_proc.join() self.server_proc.join()
class HttpUtils(object):
@staticmethod
def widget_columns_to_html_class(columns):
if not isinstance(columns, int):
raise RuntimeError('columns should be a number, got "{}"'.format(columns))
if columns == 1:
return 'one column'
elif columns == 2:
return 'two columns'
elif columns == 3:
return 'three columns'
elif columns == 4:
return 'four columns'
elif columns == 5:
return 'five columns'
elif columns == 6:
return 'six columns'
elif columns == 7:
return 'seven columns'
elif columns == 8:
return 'eight columns'
elif columns == 9:
return 'nine columns'
elif columns == 10:
return 'ten columns'
elif columns == 11:
return 'eleven columns'
elif columns == 12:
return 'twelve columns'
else:
raise RuntimeError('Constraint violation: should be 1 <= columns <= 12, ' +
'got columns={}'.format(columns))
@staticmethod
def search_directory(directory, *extensions, recursive=False):
files = []
if recursive:
for root, subdirs, files in os.walk(directory):
for file in files:
if not extensions or os.path.splitext(file)[1].lower() in extensions:
files.append(os.path.join(root, file))
else:
for file in os.listdir(directory):
if not extensions or os.path.splitext(file)[1].lower() in extensions:
files.append(os.path.join(directory, file))
return files
@classmethod
def search_web_directory(cls, directory, *extensions):
directory = os.path.abspath(os.path.expanduser(directory))
resource_dirs = get_backend('http').resource_dirs
resource_path = None
uri = ''
for name, resource_path in resource_dirs.items():
if directory.startswith(resource_path):
subdir = re.sub('^{}(.*)$'.format(resource_path),
'\\1', directory)
uri = '/resources/' + name
break
if not uri:
raise RuntimeError('Directory {} not found among the available ' +
'static resources on the webserver'.format(
directory))
results = [
re.sub('^{}(.*)$'.format(resource_path), uri + '\\1', path)
for path in cls.search_directory(directory, *extensions)
]
return results
@classmethod
def to_json(cls, data):
return json.dumps(data)
@classmethod
def from_json(cls, data):
return json.loads(data)
@classmethod
def get_config(cls, attr):
return Config.get(attr)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -0,0 +1,24 @@
import os
from flask import Flask
from platypush.backend.http.utils import HttpUtils
from platypush.backend.http.app.utils import get_routes
## Webapp initialization
base_folder = os.path.abspath(os.path.join(
os.path.dirname(os.path.abspath(__file__)), '..'))
template_folder = os.path.join(base_folder, 'templates')
static_folder = os.path.join(base_folder, 'static')
app = Flask('platypush', template_folder=template_folder,
static_folder=static_folder)
for route in get_routes():
app.register_blueprint(route)
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,30 @@
from flask import Blueprint, request, render_template
from platypush.backend.http.app import template_folder
from platypush.backend.http.app.utils import authenticate, authentication_ok, \
get_websocket_port
from platypush.backend.http.utils import HttpUtils
from platypush.config import Config
dashboard = Blueprint('dashboard', __name__, template_folder=template_folder)
# Declare routes list
__routes__ = [
dashboard,
]
@dashboard.route('/dashboard', methods=['GET'])
def dashboard():
""" Route for the fullscreen dashboard """
if not authentication_ok(request): return authenticate()
http_conf = Config.get('backend.http')
dashboard_conf = http_conf.get('dashboard', {})
return render_template('dashboard.html', config=dashboard_conf,
utils=HttpUtils, token=Config.get('token'),
websocket_port=get_websocket_port())
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,36 @@
import json
from flask import Blueprint, abort, request, Response
from platypush.backend.http.app import template_folder
from platypush.backend.http.app.utils import authenticate, authentication_ok, \
logger, send_message
from platypush.backend.http.utils import HttpUtils
execute = Blueprint('execute', __name__, template_folder=template_folder)
# Declare routes list
__routes__ = [
execute,
]
@execute.route('/execute', methods=['POST'])
def execute():
""" Endpoint to execute commands """
if not authentication_ok(request): return authenticate()
msg = json.loads(request.data.decode('utf-8'))
logger().info('Received message on the HTTP backend: {}'.format(msg))
try:
response = send_message(msg)
return Response(str(response or {}), mimetype='application/json')
except Exception as e:
logger().error('Error while running HTTP action: {}. Request: {}'.
format(str(e), msg))
abort(500, str(e))
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,52 @@
import os
from flask import Blueprint, request, render_template
from platypush.backend.http.app import template_folder
from platypush.backend.http.app.utils import authenticate, authentication_ok, \
get_websocket_port
from platypush.backend.http.utils import HttpUtils
from platypush.config import Config
index = Blueprint('index', __name__, template_folder=template_folder)
# Declare routes list
__routes__ = [
index,
]
@index.route('/')
def index():
""" Route to the main web panel """
if not authentication_ok(request): return authenticate()
# These plugins have their own template file but won't be shown as a tab in
# the web panel. This is usually the case for plugins that only include JS
# code but no template content.
_hidden_plugins = {
'assistant.google'
}
configured_plugins = Config.get_plugins()
enabled_plugins = {}
hidden_plugins = {}
for plugin, conf in configured_plugins.items():
template_file = os.path.join('plugins', plugin + '.html')
if os.path.isfile(os.path.join(template_folder, template_file)):
if plugin in _hidden_plugins:
hidden_plugins[plugin] = conf
else:
enabled_plugins[plugin] = conf
http_conf = Config.get('backend.http')
return render_template('index.html', plugins=enabled_plugins,
hidden_plugins=hidden_plugins, utils=HttpUtils,
token=Config.get('token'),
websocket_port=get_websocket_port(),
has_ssl=http_conf.get('ssl_cert') is not None)
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,95 @@
import datetime
import dateutil.parser
from flask import abort, request, render_template, Blueprint
from platypush.backend.http.app import template_folder
from platypush.backend.http.app.utils import authenticate, authentication_ok
from platypush.config import Config
map_ = Blueprint('map', __name__, template_folder=template_folder)
# Declare routes list
__routes__ = [
map_,
]
def parse_time(time_string):
if not time_string:
return None
now = datetime.datetime.now()
if time_string == 'now':
return now.isoformat()
if time_string == 'yesterday':
return (now - datetime.timedelta(days=1)).isoformat()
try:
return dateutil.parser.parse(time_string).isoformat()
except ValueError:
pass
m = re.match('([-+]?)([0-9]+)([dhms])', time_string)
if not m:
raise RuntimeError('Invalid time interval string representation: "{}"'.
format(time_string))
time_delta = (-1 if m.group(1) == '-' else 1) * int(m.group(2))
time_unit = m.group(3)
if time_unit == 'd':
params = { 'days': time_delta }
elif time_unit == 'h':
params = { 'hours': time_delta }
elif time_unit == 'm':
params = { 'minutes': time_delta }
elif time_unit == 's':
params = { 'seconds': time_delta }
return (now + datetime.timedelta(**params)).isoformat()
@map_.route('/map', methods=['GET'])
def map():
"""
Query parameters:
start -- Map timeline start timestamp
end -- Map timeline end timestamp
zoom -- Between 1-20. Set it if you want to override the
Google's API auto-zoom. You may have to set it if you are
trying to embed the map into an iframe
Supported values for `start` and `end`:
- now
- yesterday
- -30s (it means '30 seconds ago')
- -10m (it means '10 minutes ago')
- -24h (it means '24 hours ago')
- -7d (it means '7 days ago')
- 2018-06-04T17:39:22.742Z (ISO strings)
Default: start=yesterday, end=now
"""
if not authentication_ok(request): return authenticate()
map_conf = (Config.get('backend.http') or {}).get('maps', {})
if not map_conf:
abort(500, 'The maps plugin is not configured in backend.http')
api_key = map_conf.get('api_key')
if not api_key:
abort(500, 'Google Maps api_key not set in the maps configuration')
start = parse_time(request.args.get('start', default='yesterday'))
end = parse_time(request.args.get('end', default='now'))
zoom = request.args.get('zoom', default=None)
return render_template('map.html', config=map_conf, utils=HttpUtils,
start=start, end=end, zoom=zoom,
token=Config.get('token'), api_key=api_key,
websocket_port=get_websocket_port())
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,217 @@
import hashlib
import json
import threading
from flask import Response, render_template
from platypush.backend.http.app.utils import get_remote_base_url, logger, \
send_message
from platypush.backend.http.media.handlers import MediaHandler
media_map = {}
media_map_lock = threading.RLock()
# Size for the bytes chunk sent over the media streaming infra
STREAMING_CHUNK_SIZE = 4096
# Maximum range size to be sent through the media streamer if Range header
# is not set
STREAMING_BLOCK_SIZE = 3145728
def get_media_url(media_id):
return '{url}/media/{media_id}'.format(
url=get_remote_base_url(), media_id=media_id)
def get_media_id(source):
return hashlib.sha1(source.encode()).hexdigest()
def register_media(source, subtitles=None):
global media_map, media_map_lock
media_id = get_media_id(source)
media_url = get_media_url(media_id)
with media_map_lock:
if media_id in media_map:
return media_map[media_id]
subfile = None
if subtitles:
req = {
'type': 'request',
'action': 'media.subtitles.download',
'args': {
'link': subtitles,
'convert_to_vtt': True,
}
}
try:
subfile = (send_message(req).output or {}).get('filename')
except Exception as e:
logger().warning('Unable to load subtitle {}: {}'
.format(subtitles, str(e)))
with media_map_lock:
media_hndl = MediaHandler.build(source, url=media_url, subtitles=subfile)
media_map[media_id] = media_hndl
media_hndl.media_id = media_id
logger().info('Streaming "{}" on {}'.format(source, media_url))
return media_hndl
def unregister_media(source):
global media_map, media_map_lock
if source is None:
raise KeyError('No media_id specified')
media_id = get_media_id(source)
media_info = {}
with media_map_lock:
if media_id not in media_map:
raise FileNotFoundError('{} is not a registered media_id'.
format(source))
media_info = media_map.pop(media_id)
logger().info('Unregistered {} from {}'.format(source, media_info.get('url')))
return media_info
def stream_media(media_id, req):
global STREAMING_BLOCK_SIZE, STREAMING_CHUNK_SIZE
media_hndl = media_map.get(media_id)
if not media_hndl:
raise FileNotFoundError('{} is not a registered media_id'.format(media_id))
from_bytes = None
to_bytes = None
range_hdr = req.headers.get('range')
content_length = media_hndl.content_length
status_code = 200
headers = {
'Accept-Ranges': 'bytes',
'Content-Type': media_hndl.mime_type,
}
if 'download' in req.args:
headers['Content-Disposition'] = 'attachment' + \
('; filename="{}"'.format(media_hndl.filename) if
media_hndl.filename else '')
if range_hdr:
headers['Accept-Ranges'] = 'bytes'
from_bytes, to_bytes = range_hdr.replace('bytes=', '').split('-')
from_bytes = int(from_bytes)
if not to_bytes:
to_bytes = content_length-1
content_length -= from_bytes
else:
to_bytes = int(to_bytes)
content_length = to_bytes - from_bytes
status_code = 206
headers['Content-Range'] = 'bytes {start}-{end}/{size}'.format(
start=from_bytes, end=to_bytes,
size=media_hndl.content_length)
else:
from_bytes = 0
to_bytes = STREAMING_BLOCK_SIZE
headers['Content-Length'] = content_length
if 'webplayer' in req.args:
return render_template('webplayer.html',
media_url=media_hndl.url.replace(
get_remote_base_url(), ''),
media_type=media_hndl.mime_type,
subtitles_url='/media/subtitles/{}.vtt'.
format(media_id) if media_hndl.subtitles
else None)
else:
return Response(media_hndl.get_data(
from_bytes=from_bytes, to_bytes=to_bytes,
chunk_size=STREAMING_CHUNK_SIZE),
status_code, headers=headers, mimetype=headers['Content-Type'],
direct_passthrough=True)
def add_subtitles(media_id, req):
"""
This route can be used to download and/or expose subtitles files
associated to a media file
"""
media_hndl = media_map.get(media_id)
if not media_hndl:
raise FileNotFoundError('{} is not a registered media_id'.format(media_id))
subfile = None
if req.data:
subfile = json.loads(req.data.decode('utf-8')).get('filename')
if not subfile:
raise AttributeError('No filename specified in the request')
if not subfile:
if not media_hndl.path:
raise NotImplementedError(
'Subtitles are currently only supported for local media files')
req = {
'type': 'request',
'action': 'media.subtitles.get_subtitles',
'args': {
'resource': media_hndl.path,
}
}
try:
subtitles = send_message(req).output or []
except Exception as e:
raise RuntimeError('Could not get subtitles: {}'.format(str(e)))
if not subtitles:
raise FileNotFoundError('No subtitles found for resource {}'.
format(media_hndl.path))
req = {
'type': 'request',
'action': 'media.subtitles.download',
'args': {
'link': subtitles[0].get('SubDownloadLink'),
'media_resource': media_hndl.path,
'convert_to_vtt': True,
}
}
subfile = (send_message(req).output or {}).get('filename')
media_hndl.set_subtitles(subfile)
return {
'filename': subfile,
'url': get_remote_base_url() + '/media/subtitles/' + media_id + '.vtt',
}
def remove_subtitles(media_id):
media_hndl = media_map.get(media_id)
if not media_hndl:
raise FileNotFoundError('{} is not a registered media_id'.
format(media_id))
if not media_hndl.subtitles:
raise FileNotFoundError('{} has no subtitles attached'.
format(media_id))
media_hndl.remove_subtitles()
return {}
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,86 @@
import json
from flask import abort, jsonify, request, Blueprint
from platypush.backend.http.app import template_folder
from platypush.backend.http.app.utils import logger, get_remote_base_url
from platypush.backend.http.app.routes.plugins.media import media_map, \
stream_media, register_media, unregister_media
media = Blueprint('media', __name__, template_folder=template_folder)
# Declare routes list
__routes__ = [
media,
]
@media.route('/media', methods=['GET'])
def get_media():
"""
This route can be used to get the list of registered streams
"""
return jsonify([dict(media) for media in media_map.values()])
@media.route('/media', methods=['PUT'])
def add_media():
"""
This route can be used by the `media` plugin to add streaming content over HTTP
"""
args = {}
try:
args = json.loads(request.data.decode('utf-8'))
except:
abort(400, 'Invalid JSON request')
source = args.get('source')
if not source:
abort(400, 'The request does not contain any source')
subtitles = args.get('subtitles')
try:
media_hndl = register_media(source, subtitles)
ret = dict(media_hndl)
if media_hndl.subtitles:
ret['subtitles_url'] = get_remote_base_url() + \
'/media/subtitles/' + media_hndl.media_id + '.vtt'
return jsonify(ret)
except FileNotFoundError as e:
abort(404, str(e))
except AttributeError as e:
abort(400, str(e))
except Exception as e:
logger().exception(e)
abort(500, str(e))
@media.route('/media/<media_id>', methods=['GET', 'DELETE'])
def stream_or_delete_media(media_id):
"""
This route can be used to stream active media points or unregister
a mounted media stream
"""
# Remove the extension
media_id = '.'.join(media_id.split('.')[:-1])
try:
if request.method == 'GET':
if media_id is None:
return jsonify([dict(media) for media in media_map.values()])
else:
return stream_media(media_id, request)
else:
media_info = unregister_media(media_id)
return jsonify(media_info)
except (AttributeError, FileNotFoundError) as e:
abort(404, str(e))
except KeyError as e:
abort(400, str(e))
except Exception as e:
logger().exception(e)
abort(500, str(e))
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,51 @@
import os
from flask import abort, jsonify, request, send_from_directory, Blueprint
from platypush.backend.http.app import template_folder
from platypush.backend.http.app.routes.plugins.media import media_map, \
remove_subtitles, add_subtitles
subtitles = Blueprint('subtitles', __name__, template_folder=template_folder)
# Declare routes list
__routes__ = [
subtitles,
]
@subtitles.route('/media/subtitles/<media_id>.vtt', methods=['GET', 'POST', 'DELETE'])
def handle_subtitles(media_id):
"""
This route can be used to download and/or expose subtitle files
associated to a media file
"""
if request.method == 'GET':
media_hndl = media_map.get(media_id)
if not media_hndl:
abort(404, 'No such media')
if not media_hndl.subtitles:
abort(404, 'The media has no subtitles attached')
return send_from_directory(
os.path.dirname(media_hndl.subtitles),
os.path.basename(media_hndl.subtitles),
mimetype='text/vtt')
try:
if request.method == 'DELETE':
return jsonify(remove_subtitles(media_id))
else:
return jsonify(add_subtitles(media_id, request))
except FileNotFoundError as e:
abort(404, str(e))
except AttributeError as e:
abort(400, str(e))
except NotImplementedError as e:
abort(422, str(e))
except Exception as e:
abort(500, str(e))
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,55 @@
import os
import re
from flask import Blueprint, abort, send_from_directory
from platypush.config import Config
from platypush.backend.http.app import template_folder
from platypush.backend.http.app.utils import authenticate, authentication_ok, \
send_message
resources = Blueprint('resources', __name__, template_folder=template_folder)
# Declare routes list
__routes__ = [
resources,
]
@resources.route('/resources/<path:path>', methods=['GET'])
def resources_path(path):
""" Custom static resources """
path_tokens = path.split('/')
filename = path_tokens.pop(-1)
http_conf = Config.get('backend.http')
resource_dirs = http_conf.get('resource_dirs', {})
while path_tokens:
if '/'.join(path_tokens) in resource_dirs:
break
path_tokens.pop()
if not path_tokens:
# Requested resource not found in the allowed resource_dirs
abort(404)
base_path = '/'.join(path_tokens)
real_base_path = os.path.abspath(os.path.expanduser(resource_dirs[base_path]))
real_path = real_base_path
file_path = [s for s in re.sub(r'^{}(.*)$'.format(base_path), '\\1', path)
.split('/') if s]
for p in file_path[:-1]:
real_path += os.sep + p
file_path.pop(0)
file_path = file_path.pop(0)
if not real_path.startswith(real_base_path):
# Directory climbing attempt
abort(404)
return send_from_directory(real_path, file_path)
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,155 @@
import importlib
import json
import logging
import os
import sys
from flask import Response
from redis import Redis
# NOTE: The HTTP service will *only* work on top of a Redis bus. The default
# internal bus service won't work as the web server will run in a different process.
from platypush.bus.redis import RedisBus
from platypush.config import Config
from platypush.message import Message
from platypush.message.request import Request
from platypush.utils import get_redis_queue_name_by_message, get_ip_or_hostname
_bus = None
_logger = None
def bus():
global _bus
if _bus is None:
_bus = RedisBus()
return _bus
def logger():
global _logger
if not _logger:
log_args = {
'level': logging.INFO,
'format': '%(asctime)-15s|%(levelname)5s|%(name)s|%(message)s',
}
level = (Config.get('backend.http') or {}).get('logging') or \
(Config.get('logging') or {}).get('level')
filename = (Config.get('backend.http') or {}).get('filename')
if level:
log_args['level'] = getattr(logging, level.upper()) \
if isinstance(level, str) else level
if filename:
log_args['filename'] = filename
logging.basicConfig(**log_args)
_logger = logging.getLogger('platyweb')
return _logger
def get_message_response(msg):
redis = Redis(**bus().redis_args)
response = redis.blpop(get_redis_queue_name_by_message(msg), timeout=60)
if response and len(response) > 1:
response = Message.build(response[1])
else:
response = None
return response
def get_http_port():
from platypush.backend.http import HttpBackend
http_conf = Config.get('backend.http')
return http_conf.get('port', HttpBackend._DEFAULT_HTTP_PORT)
def get_websocket_port():
from platypush.backend.http import HttpBackend
http_conf = Config.get('backend.http')
return http_conf.get('websocket_port', HttpBackend._DEFAULT_WEBSOCKET_PORT)
def send_message(msg):
msg = Message.build(msg)
if isinstance(msg, Request):
msg.origin = 'http'
if Config.get('token'):
msg.token = Config.get('token')
bus().post(msg)
if isinstance(msg, Request):
response = get_message_response(msg)
logger().info('Processing response on the HTTP backend: {}'.
format(response))
return response
def authenticate():
return Response('Authentication required', 401,
{'WWW-Authenticate': 'Basic realm="Login required"'})
def authentication_ok(req):
token = Config.get('token')
if not token:
return True
user_token = None
# Check if
if 'X-Token' in req.headers:
user_token = req.headers['X-Token']
elif req.authorization:
# TODO support for user check
user_token = req.authorization.password
elif 'token' in req.args:
user_token = req.args.get('token')
else:
try:
args = json.loads(req.data.decode('utf-8'))
user_token = args.get('token')
except:
pass
if user_token == token:
return True
return False
def get_routes():
routes_dir = os.path.join(
os.path.dirname(os.path.abspath(__file__)), 'routes')
routes = []
for path, dirs, files in os.walk(routes_dir):
for f in files:
if f.endswith('.py'):
sys.path.insert(0, path)
try:
mod = importlib.import_module('.'.join(f.split('.')[:-1]))
if hasattr(mod, '__routes__'):
routes.extend(mod.__routes__)
except Exception as e:
logger().warning('Could not import routes from {}/{}: {}: {}'.
format(path, f, type(e), str(e)))
return routes
def get_local_base_url():
http_conf = Config.get('backend.http') or {}
return '{proto}://localhost:{port}'.format(
proto=('https' if http_conf.get('ssl_cert') else 'http'),
port=get_http_port())
def get_remote_base_url():
http_conf = Config.get('backend.http') or {}
return '{proto}://{host}:{port}'.format(
proto=('https' if http_conf.get('ssl_cert') else 'http'),
host=get_ip_or_hostname(), port=get_http_port())
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,97 @@
import json
import os
import re
from platypush.config import Config
class HttpUtils(object):
@staticmethod
def widget_columns_to_html_class(columns):
if not isinstance(columns, int):
raise RuntimeError('columns should be a number, got "{}"'.format(columns))
if columns == 1:
return 'one column'
elif columns == 2:
return 'two columns'
elif columns == 3:
return 'three columns'
elif columns == 4:
return 'four columns'
elif columns == 5:
return 'five columns'
elif columns == 6:
return 'six columns'
elif columns == 7:
return 'seven columns'
elif columns == 8:
return 'eight columns'
elif columns == 9:
return 'nine columns'
elif columns == 10:
return 'ten columns'
elif columns == 11:
return 'eleven columns'
elif columns == 12:
return 'twelve columns'
else:
raise RuntimeError('Constraint violation: should be 1 <= columns <= 12, ' +
'got columns={}'.format(columns))
@staticmethod
def search_directory(directory, *extensions, recursive=False):
files = []
if recursive:
for root, subdirs, files in os.walk(directory):
for file in files:
if not extensions or os.path.splitext(file)[1].lower() in extensions:
files.append(os.path.join(root, file))
else:
for file in os.listdir(directory):
if not extensions or os.path.splitext(file)[1].lower() in extensions:
files.append(os.path.join(directory, file))
return files
@classmethod
def search_web_directory(cls, directory, *extensions):
directory = os.path.abspath(os.path.expanduser(directory))
resource_dirs = (Config.get('backend.http') or {}).get('resource_dirs', {})
resource_path = None
uri = ''
for name, resource_path in resource_dirs.items():
resource_path = os.path.abspath(os.path.expanduser(resource_path))
if directory.startswith(resource_path):
subdir = re.sub('^{}(.*)$'.format(resource_path),
'\\1', directory)
uri = '/resources/' + name
break
if not uri:
raise RuntimeError(('Directory {} not found among the available ' +
'static resources on the webserver').format(
directory))
results = [
re.sub('^{}(.*)$'.format(resource_path), uri + '\\1', path)
for path in cls.search_directory(directory, *extensions)
]
return results
@classmethod
def to_json(cls, data):
return json.dumps(data)
@classmethod
def from_json(cls, data):
return json.loads(data)
@classmethod
def get_config(cls, attr):
return Config.get(attr)
# vim:sw=4:ts=4:et:

View file

@ -8,6 +8,7 @@ from platypush.backend import Backend
from platypush.context import get_plugin, get_or_create_event_loop from platypush.context import get_plugin, get_or_create_event_loop
from platypush.message import Message from platypush.message import Message
from platypush.message.request import Request from platypush.message.request import Request
from platypush.message.response import Response
from platypush.utils import get_ssl_server_context from platypush.utils import get_ssl_server_context
@ -23,8 +24,11 @@ class WebsocketBackend(Backend):
# Websocket client message recv timeout in seconds # Websocket client message recv timeout in seconds
_websocket_client_timeout = 0 _websocket_client_timeout = 0
def __init__(self, port=8765, bind_address='0.0.0.0', ssl_cafile=None, # Default websocket service port
ssl_capath=None, ssl_cert=None, ssl_key=None, _default_websocket_port = 8765
def __init__(self, port=_default_websocket_port, bind_address='0.0.0.0',
ssl_cafile=None, ssl_capath=None, ssl_cert=None, ssl_key=None,
client_timeout=_websocket_client_timeout, **kwargs): client_timeout=_websocket_client_timeout, **kwargs):
""" """
:param port: Listen port for the websocket server (default: 8765) :param port: Listen port for the websocket server (default: 8765)
@ -116,26 +120,22 @@ class WebsocketBackend(Backend):
self.on_message(msg) self.on_message(msg)
if isinstance(msg, Request): if isinstance(msg, Request):
response = self.get_message_response(msg) response = self.get_message_response(msg) or Response()
if not response:
return
self.logger.info('Processing response on the websocket backend: {}'. self.logger.info('Processing response on the websocket backend: {}'.
format(response)) format(response))
await websocket.send(str(response)) await websocket.send(str(response))
except websockets.exceptions.ConnectionClosed as e:
self.active_websockets.remove(websocket)
self.logger.debug('Websocket client {} closed connection'.
format(websocket.remote_address[0]))
except asyncio.TimeoutError as e:
self.active_websockets.remove(websocket)
self.logger.debug('Websocket connection to {} timed out'.
format(websocket.remote_address[0]))
except Exception as e: except Exception as e:
if isinstance(e, websockets.exceptions.ConnectionClosed): self.logger.exception(e)
self.active_websockets.remove(websocket)
self.logger.debug('Websocket client {} closed connection'.
format(websocket.remote_address[0]))
elif isinstance(e, asyncio.TimeoutError):
self.active_websockets.remove(websocket)
self.logger.debug('Websocket connection to {} timed out'.
format(websocket.remote_address[0]))
else:
self.logger.exception(e)
self.logger.info('Initialized websocket backend on port {}, bind address: {}'. self.logger.info('Initialized websocket backend on port {}, bind address: {}'.
format(self.port, self.bind_address)) format(self.port, self.bind_address))

View file

@ -5,6 +5,7 @@ import threading
from redis import Redis from redis import Redis
from platypush.bus import Bus from platypush.bus import Bus
from platypush.config import Config
from platypush.message import Message from platypush.message import Message
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -17,7 +18,12 @@ class RedisBus(Bus):
def __init__(self, on_message=None, redis_queue=_DEFAULT_REDIS_QUEUE, def __init__(self, on_message=None, redis_queue=_DEFAULT_REDIS_QUEUE,
*args, **kwargs): *args, **kwargs):
super().__init__(on_message=on_message) super().__init__(on_message=on_message)
if not args and not kwargs:
kwargs = (Config.get('backend.redis') or {}).get('redis_args', {})
self.redis = Redis(*args, **kwargs) self.redis = Redis(*args, **kwargs)
self.redis_args = kwargs
self.redis_queue = redis_queue self.redis_queue = redis_queue
self.on_message = on_message self.on_message = on_message
self.thread_id = threading.get_ident() self.thread_id = threading.get_ident()

View file

@ -217,9 +217,7 @@ class Request(Message):
# Retry mechanism # Retry mechanism
plugin.logger.exception(e) plugin.logger.exception(e)
logger.warning(('Uncaught exception while processing response ' + logger.warning(('Uncaught exception while processing response ' +
'from action [{}] from plugin {}: {}').format( 'from action [{}]: {}').format(self.action, str(e)))
self.action, plugin.__class__.__name__,
str(e)))
errors = errors or [] errors = errors or []
if str(e) not in errors: if str(e) not in errors: