From 0e794cd1b0f0b31ac91e8e526c3cbd5ffef7f763 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sat, 23 Feb 2019 21:19:00 +0100 Subject: [PATCH] Refactored HTTP server to split the routes on separate files and keep the main Flask app object in a separate file as well so it can be easily wrapped by a WSGI instance --- platypush/backend/__init__.py | 2 +- platypush/backend/http/__init__.py | 718 +----------------- platypush/backend/http/app/__init__.py | 24 + platypush/backend/http/app/routes/__init__.py | 0 .../backend/http/app/routes/dashboard.py | 30 + platypush/backend/http/app/routes/execute.py | 36 + platypush/backend/http/app/routes/index.py | 52 ++ platypush/backend/http/app/routes/map.py | 95 +++ .../http/app/routes/plugins/media/__init__.py | 217 ++++++ .../http/app/routes/plugins/media/stream.py | 86 +++ .../app/routes/plugins/media/subtitles.py | 51 ++ .../backend/http/app/routes/resources.py | 55 ++ platypush/backend/http/app/utils.py | 155 ++++ platypush/backend/http/utils.py | 97 +++ platypush/backend/websocket.py | 34 +- platypush/bus/redis.py | 6 + platypush/message/request/__init__.py | 4 +- 17 files changed, 961 insertions(+), 701 deletions(-) create mode 100644 platypush/backend/http/app/__init__.py create mode 100644 platypush/backend/http/app/routes/__init__.py create mode 100644 platypush/backend/http/app/routes/dashboard.py create mode 100644 platypush/backend/http/app/routes/execute.py create mode 100644 platypush/backend/http/app/routes/index.py create mode 100644 platypush/backend/http/app/routes/map.py create mode 100644 platypush/backend/http/app/routes/plugins/media/__init__.py create mode 100644 platypush/backend/http/app/routes/plugins/media/stream.py create mode 100644 platypush/backend/http/app/routes/plugins/media/subtitles.py create mode 100644 platypush/backend/http/app/routes/resources.py create mode 100644 platypush/backend/http/app/utils.py create mode 100644 platypush/backend/http/utils.py diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 1f5dd778d1..c25f4143c7 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -255,7 +255,7 @@ class Backend(Thread): try: redis = self._get_redis() response = redis.blpop(get_redis_queue_name_by_message(msg), timeout=60) - if response and response[1]: + if response and len(response) > 1: response = Message.build(response[1]) else: response = None diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index 25964511ba..8ed60da5d3 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -1,31 +1,12 @@ -import asyncio -import datetime -import dateutil.parser -import hashlib -import inspect -import json import os -import re import threading -import time from multiprocessing import Process -from flask import Flask, Response, abort, jsonify, request as http_request, \ - render_template, send_from_directory -from redis import Redis - -from platypush.config import Config -from platypush.context import get_backend, get_plugin, get_or_create_event_loop -from platypush.message import Message -from platypush.message.event import Event, StopEvent -from platypush.message.event.web.widget import WidgetUpdateEvent -from platypush.message.request import Request -from platypush.utils import get_ssl_server_context, set_thread_name, \ - get_ip_or_hostname - -from .. import Backend -from .media.handlers import MediaHandler +from platypush.backend import Backend +from platypush.backend.http.app import app +from platypush.context import get_or_create_event_loop +from platypush.utils import get_ssl_server_context, set_thread_name class HttpBackend(Backend): @@ -47,6 +28,13 @@ class HttpBackend(Backend): * To display a fullscreen dashboard with your configured widgets, by default available under ``/dashboard`` + * To stream media over HTTP through the ``/media`` endpoint + + Any plugin can register custom routes under ``platypush/backend/http/app/routes/plugins``. + Any additional route is managed as a Flask blueprint template and the `.py` + module can expose lists of routes to the main webapp through the + ``__routes__`` object (a list of Flask blueprints). + Note that if you set up a main token, it will be required for any HTTP interaction - either as ``X-Token`` HTTP header, on the query string (attribute name: ``token``), as part of the JSON payload root (attribute @@ -62,21 +50,13 @@ class HttpBackend(Backend): support if you want to enable media streaming """ - hidden_plugins = { - 'assistant.google' - } + _DEFAULT_HTTP_PORT = 8008 + _DEFAULT_WEBSOCKET_PORT = 8009 - # Default size for the bytes chunk sent over the media streaming infra - _DEFAULT_STREAMING_CHUNK_SIZE = 4096 - - # Maximum range size to be sent through the media streamer if Range header - # is not set - _DEFAULT_STREAMING_BLOCK_SIZE = 3145728 - - def __init__(self, port=8008, websocket_port=8009, disable_websocket=False, - redis_queue='platypush/http', dashboard={}, resource_dirs={}, + def __init__(self, port=_DEFAULT_HTTP_PORT, + websocket_port=_DEFAULT_WEBSOCKET_PORT, + disable_websocket=False, dashboard={}, resource_dirs={}, ssl_cert=None, ssl_key=None, ssl_cafile=None, ssl_capath=None, - streaming_chunk_size=_DEFAULT_STREAMING_CHUNK_SIZE, maps={}, **kwargs): """ :param port: Listen port for the web server (default: 8008) @@ -88,9 +68,6 @@ class HttpBackend(Backend): :param disable_websocket: Disable the websocket interface (default: False) :type disable_websocket: bool - :param redis_queue: Name of the Redis queue used to synchronize messages with the web server process (default: ``platypush/http``) - :type redis_queue: str - :param ssl_cert: Set it to the path of your certificate file if you want to enable HTTPS (default: None) :type ssl_cert: str @@ -138,25 +115,17 @@ class HttpBackend(Backend): db: "sqlite:////home/blacklight/.local/share/platypush/feeds/rss.db" :type dashboard: dict - - :param streaming_chunk_size: Size for the chunks of bytes sent over the - media streaming infrastructure (default: 4096 bytes) - :type streaming_chunk_size: int """ super().__init__(**kwargs) self.port = port self.websocket_port = websocket_port - self.app = None - self.redis_queue = redis_queue self.dashboard = dashboard self.maps = maps self.server_proc = None self.disable_websocket = disable_websocket self.websocket_thread = None - self.redis_thread = None - self.redis = None self.resource_dirs = { name: os.path.abspath( os.path.expanduser(d)) for name, d in resource_dirs.items() } self.active_websockets = set() @@ -166,32 +135,15 @@ class HttpBackend(Backend): ssl_capath=ssl_capath) \ if ssl_cert else None - self.remote_base_url = '{proto}://{host}:{port}'.format( - proto=('https' if self.ssl_context else 'http'), - host=get_ip_or_hostname(), port=self.port) - - self.local_base_url = '{proto}://localhost:{port}'.format( - proto=('https' if self.ssl_context else 'http'), port=self.port) - - self._media_map_lock = threading.RLock() - def send_message(self, msg): self.logger.warning('Use cURL or any HTTP client to query the HTTP backend') - def stop(self): - """ Stop the web server """ + def on_stop(self): + """ On backend stop """ self.logger.info('Received STOP event on HttpBackend') - if self.redis_thread: - stop_evt = StopEvent(target=self.device_id, origin=self.device_id, - thread_id=self.redis_thread.ident) - - redis = self._get_redis() - if redis: - redis.rpush(self.redis_queue, str(stop_evt)) - if self.server_proc: self.server_proc.terminate() self.server_proc.join() @@ -216,516 +168,6 @@ class HttpBackend(Backend): self.active_websockets.remove(websocket) - def redis_poll(self): - """ Polls for new messages on the internal Redis queue """ - - while not self.should_stop(): - redis = self._get_redis() - if not redis: - continue - - msg = redis.blpop(self.redis_queue) - msg = Message.build(json.loads(msg[1].decode('utf-8'))) - self.on_message(msg) - - - @classmethod - def _authenticate(cls): - return Response('Authentication required', 401, - {'WWW-Authenticate': 'Basic realm="Login required"'}) - - @classmethod - def _authentication_ok(cls): - token = Config.get('token') - if not token: - return True - - user_token = None - - # Check if - if 'X-Token' in http_request.headers: - user_token = http_request.headers['X-Token'] - elif http_request.authorization: - # TODO support for user check - user_token = http_request.authorization.password - elif 'token' in http_request.args: - user_token = http_request.args.get('token') - else: - try: - args = json.loads(http_request.data.decode('utf-8')) - user_token = args.get('token') - except: - pass - - if user_token == token: - return True - - return False - - def webserver(self): - """ Web server main process """ - set_thread_name('WebServer') - - basedir = os.path.dirname(inspect.getfile(self.__class__)) - template_dir = os.path.join(basedir, 'templates') - media_map = {} - app = Flask(__name__, template_folder=template_dir) - - self.redis_thread = threading.Thread(target=self.redis_poll) - self.redis_thread.start() - - @app.route('/execute', methods=['POST']) - def execute(): - """ Endpoint to execute commands """ - if not self._authentication_ok(): return self._authenticate() - - args = json.loads(http_request.data.decode('utf-8')) - msg = Message.build(args) - self.logger.info('Received message on the HTTP backend: {}'.format(msg)) - - if Config.get('token'): - msg.token = Config.get('token') - - if isinstance(msg, Request): - msg.backend = self - msg.origin = 'http' - - self.bus.post(msg) - - if isinstance(msg, Request): - response = self.get_message_response(msg) - self.logger.info('Processing response on the HTTP backend: {}'.format(response)) - if response: - return Response(str(response), mimetype='application/json') - - @app.route('/') - def index(): - """ Route to the main web panel """ - if not self._authentication_ok(): return self._authenticate() - - configured_plugins = Config.get_plugins() - enabled_plugins = {} - hidden_plugins = {} - - for plugin, conf in configured_plugins.items(): - template_file = os.path.join('plugins', plugin + '.html') - if os.path.isfile(os.path.join(template_dir, template_file)): - if plugin in self.hidden_plugins: - hidden_plugins[plugin] = conf - else: - enabled_plugins[plugin] = conf - - return render_template('index.html', plugins=enabled_plugins, - hidden_plugins=hidden_plugins, utils=HttpUtils, - token=Config.get('token'), - websocket_port=self.websocket_port, - has_ssl=self.ssl_context is not None) - - - @app.route('/widget/', methods=['POST']) - def widget_update(widget): - """ ``POST /widget/`` will update the specified widget_id on the dashboard with the specified key-values """ - event = WidgetUpdateEvent( - widget=widget, **(json.loads(http_request.data.decode('utf-8')))) - - redis = self._get_redis() - if redis: - redis.rpush(self.redis_queue, str(event)) - return jsonify({ 'status': 'ok' }) - - @app.route('/resources/', methods=['GET']) - def static_path(path): - """ Static resources """ - base_path = os.path.dirname(path).split('/') - while base_path: - if os.sep.join(base_path) in self.resource_dirs: - break - base_path.pop() - - if not base_path: - abort(404) - - base_path = os.sep.join(base_path) - real_base_path = self.resource_dirs[base_path] - real_path = real_base_path - - file_path = [s for s in - re.sub(r'^{}(.*)$'.format(base_path), '\\1', path) \ - .split('/') if s] - - for p in file_path[:-1]: - real_path += os.sep + p - file_path.pop(0) - - file_path = file_path.pop(0) - if not real_path.startswith(real_base_path): - # Directory climbing attempt - abort(404) - - return send_from_directory(real_path, file_path) - - def get_media_url(media_id): - return '{url}/media/{media_id}'.format( - url=self.remote_base_url, media_id=media_id) - - def get_media_id(source): - return hashlib.sha1(source.encode()).hexdigest() - - def register_media(source, subtitles=None): - media_id = get_media_id(source) - media_url = get_media_url(media_id) - - with self._media_map_lock: - if media_id in media_map: - return media_map[media_id] - - subfile = None - - if subtitles: - try: - subfile = get_plugin('media.subtitles').download( - link=subtitles, convert_to_vtt=True - ).output.get('filename') - except Exception as e: - self.logger.warning('Unable to load subtitle {}: {}' - .format(subtitles, str(e))) - - media_hndl = MediaHandler.build(source, url=media_url, - subtitles=subfile) - - media_map[media_id] = media_hndl - media_hndl.media_id = media_id - - self.logger.info('Streaming "{}" on {}'.format(source, media_url)) - return media_hndl - - - def unregister_media(source): - if source is None: - raise KeyError('No media_id specified') - - media_id = get_media_id(source) - media_info = {} - - with self._media_map_lock: - if media_id not in media_map: - raise FileNotFoundError('{} is not a registered media_id'. - format(source)) - media_info = media_map.pop(media_id) - - self.logger.info('Unregistered {} from {}'.format( - source, media_info.get('url'))) - - return media_info - - - def stream_media(media_id, request): - media_hndl = media_map.get(media_id) - if not media_hndl: - raise FileNotFoundError('{} is not a registered media_id'. - format(media_id)) - - from_bytes = None - to_bytes = None - range_hdr = request.headers.get('range') - content_length = media_hndl.content_length - status_code = 200 - - headers = { - 'Accept-Ranges': 'bytes', - 'Content-Type': media_hndl.mime_type, - } - - if 'download' in request.args: - headers['Content-Disposition'] = 'attachment' + \ - ('; filename="{}"'.format(media_hndl.filename) if - media_hndl.filename else '') - - if range_hdr: - headers['Accept-Ranges'] = 'bytes' - from_bytes, to_bytes = range_hdr.replace('bytes=', '').split('-') - from_bytes = int(from_bytes) - - if not to_bytes: - to_bytes = content_length-1 - content_length -= from_bytes - else: - to_bytes = int(to_bytes) - content_length = to_bytes - from_bytes - - status_code = 206 - headers['Content-Range'] = 'bytes {start}-{end}/{size}'.format( - start=from_bytes, end=to_bytes, - size=media_hndl.content_length) - else: - from_bytes = 0 - to_bytes = self._DEFAULT_STREAMING_BLOCK_SIZE - - headers['Content-Length'] = content_length - - if 'webplayer' in request.args: - return render_template('webplayer.html', - media_url=media_hndl.url.replace( - self.remote_base_url, ''), - media_type=media_hndl.mime_type, - subtitles_url='/media/subtitles/{}.vtt'. - format(media_id) if media_hndl.subtitles - else None) - else: - return Response(media_hndl.get_data( - from_bytes=from_bytes, to_bytes=to_bytes, - chunk_size=self._DEFAULT_STREAMING_CHUNK_SIZE), - status_code, headers=headers, mimetype=headers['Content-Type'], - direct_passthrough=True) - - - def add_subtitles(media_id, req): - """ - This route can be used to download and/or expose subtitles files - associated to a media file - """ - - media_hndl = media_map.get(media_id) - if not media_hndl: - raise FileNotFoundError('{} is not a registered media_id'. - format(media_id)) - - subfile = None - if req.data: - try: - subfile = json.loads(req.data.decode('utf-8')) \ - .get('filename') - if not subfile: - raise AttributeError - except Exception as e: - raise AttributeError(400, 'No filename in the request: {}' - .format(str(e))) - - if not subfile: - if not media_hndl.path: - raise NotImplementedError( - 'Subtitles are currently only supported for ' + - 'local media files') - - try: - subtitles = get_plugin('media.subtitles').get_subtitles( - media_hndl.path) - except Exception as e: - raise RuntimeError('Could not get subtitles: {}'. - format(str(e))) - - if not subtitles: - raise FileNotFoundError( - 'No subtitles found for resource {}'.format( - media_hndl.path)) - - subfile = get_plugin('media.subtitles').download( - link=subtitles[0].get('SubDownloadLink'), - media_resource=media_hndl.path, - convert_to_vtt=True).get('filename') - - media_hndl.set_subtitles(subfile) - return { - 'filename': subfile, - 'url': self.remote_base_url + '/media/subtitles/' + media_id + '.vtt', - } - - def remove_subtitles(media_id): - media_hndl = media_map.get(media_id) - if not media_hndl: - raise FileNotFoundError('{} is not a registered media_id'. - format(media_id)) - - if not media_hndl.subtitles: - raise FileNotFoundError('{} has no subtitles attached'. - format(media_id)) - - media_hndl.remove_subtitles() - return {} - - - @app.route('/media/subtitles/.vtt', methods=['GET', 'POST', 'DELETE']) - def handle_subtitles(media_id): - """ - This route can be used to download and/or expose subtitle files - associated to a media file - """ - - if http_request.method == 'GET': - media_hndl = media_map.get(media_id) - if not media_hndl: - abort(404, 'No such media') - - if not media_hndl.subtitles: - abort(404, 'The media has no subtitles attached') - - return send_from_directory( - os.path.dirname(media_hndl.subtitles), - os.path.basename(media_hndl.subtitles), - mimetype='text/vtt') - - try: - if http_request.method == 'DELETE': - return jsonify(remove_subtitles(media_id)) - else: - return jsonify(add_subtitles(media_id, http_request)) - except FileNotFoundError as e: - abort(404, str(e)) - except AttributeError as e: - abort(400, str(e)) - except NotImplementedError as e: - abort(422, str(e)) - except Exception as e: - abort(500, str(e)) - - @app.route('/media', methods=['GET', 'PUT']) - def add_or_get_media(): - """ - This route can be used by the `media` plugin to add streaming - content over HTTP or to get the list of registered streams - """ - - if http_request.method == 'GET': - return jsonify([dict(media) for media in media_map.values()]) - - args = {} - try: - args = json.loads(http_request.data.decode('utf-8')) - except: - abort(400, 'Invalid JSON request') - - source = args.get('source') - if not source: - abort(400, 'The request does not contain any source') - - subtitles = args.get('subtitles') - try: - media_hndl = register_media(source, subtitles) - ret = dict(media_hndl) - if media_hndl.subtitles: - ret['subtitles_url'] = self.remote_base_url + \ - '/media/subtitles/' + media_hndl.media_id + '.vtt' - return jsonify(ret) - except FileNotFoundError as e: - abort(404, str(e)) - except AttributeError as e: - abort(400, str(e)) - except Exception as e: - self.logger.exception(e) - abort(500, str(e)) - - @app.route('/media/', methods=['GET', 'DELETE']) - def stream_or_delete_media(media_id): - """ - This route can be used to stream active media points or unregister - a mounted media stream - """ - - # Remove the extension - media_id = '.'.join(media_id.split('.')[:-1]) - - try: - if http_request.method == 'GET': - if media_id is None: - return jsonify(media_map) - else: - return stream_media(media_id, http_request) - else: - media_info = unregister_media(media_id) - return jsonify(media_info) - except (AttributeError, FileNotFoundError) as e: - abort(404, str(e)) - except KeyError as e: - abort(400, str(e)) - except Exception as e: - self.logger.exception(e) - abort(500, str(e)) - - @app.route('/dashboard', methods=['GET']) - def dashboard(): - """ Route for the fullscreen dashboard """ - if not self._authentication_ok(): return self._authenticate() - - return render_template('dashboard.html', config=self.dashboard, utils=HttpUtils, - token=Config.get('token'), websocket_port=self.websocket_port) - - @app.route('/map', methods=['GET']) - def map(): - """ - Query parameters: - start -- Map timeline start timestamp - end -- Map timeline end timestamp - zoom -- Between 1-20. Set it if you want to override the - Google's API auto-zoom. You may have to set it if you are - trying to embed the map into an iframe - - Supported values for `start` and `end`: - - now - - yesterday - - -30s (it means '30 seconds ago') - - -10m (it means '10 minutes ago') - - -24h (it means '24 hours ago') - - -7d (it means '7 days ago') - - 2018-06-04T17:39:22.742Z (ISO strings) - - Default: start=yesterday, end=now - """ - - def parse_time(time_string): - if not time_string: - return None - - now = datetime.datetime.now() - - if time_string == 'now': - return now.isoformat() - if time_string == 'yesterday': - return (now - datetime.timedelta(days=1)).isoformat() - - try: - return dateutil.parser.parse(time_string).isoformat() - except ValueError: - pass - - m = re.match('([-+]?)([0-9]+)([dhms])', time_string) - if not m: - raise RuntimeError('Invalid time interval string representation: "{}"'. - format(time_string)) - - time_delta = (-1 if m.group(1) == '-' else 1) * int(m.group(2)) - time_unit = m.group(3) - - if time_unit == 'd': - params = { 'days': time_delta } - elif time_unit == 'h': - params = { 'hours': time_delta } - elif time_unit == 'm': - params = { 'minutes': time_delta } - elif time_unit == 's': - params = { 'seconds': time_delta } - - return (now + datetime.timedelta(**params)).isoformat() - - - if not self._authentication_ok(): return self._authenticate() - - try: - api_key = self.maps['api_key'] - except KeyError: - raise RuntimeError('Google Maps api_key not set in the maps configuration') - - start = parse_time(http_request.args.get('start', default='yesterday')) - end = parse_time(http_request.args.get('end', default='now')) - zoom = http_request.args.get('zoom', default=None) - - return render_template('map.html', config=self.maps, - utils=HttpUtils, start=start, end=end, - zoom=zoom, token=Config.get('token'), api_key=api_key, - websocket_port=self.websocket_port) - - return app - - def websocket(self): """ Websocket main server """ import websockets @@ -754,120 +196,36 @@ class HttpBackend(Backend): **websocket_args)) loop.run_forever() + def _start_web_server(self): + def proc(): + kwargs = { + 'host': '0.0.0.0', + 'port': self.port, + 'use_reloader': False, + 'debug': False, + } + + if self.ssl_context: + kwargs['ssl_context'] = self.ssl_context + + app.run(**kwargs) + + return proc + + def run(self): super().run() - os.putenv('FLASK_APP', 'platypush') - os.putenv('FLASK_ENV', 'production') - kwargs = { - 'host':'0.0.0.0', 'port':self.port, 'use_reloader':False - } - if self.ssl_context: - kwargs['ssl_context'] = self.ssl_context - - self.logger.info('Initialized HTTP backend on port {}'.format(self.port)) - - self.app = self.webserver() - self.app.debug = False - self.server_proc = Process(target=self.app.run, - name='WebServer', - kwargs=kwargs) + self.server_proc = Process(target=self._start_web_server(), + name='WebServer') self.server_proc.start() if not self.disable_websocket: self.websocket_thread = threading.Thread(target=self.websocket) self.websocket_thread.start() + self.logger.info('Initialized HTTP backend on port {}'.format(self.port)) self.server_proc.join() -class HttpUtils(object): - @staticmethod - def widget_columns_to_html_class(columns): - if not isinstance(columns, int): - raise RuntimeError('columns should be a number, got "{}"'.format(columns)) - - if columns == 1: - return 'one column' - elif columns == 2: - return 'two columns' - elif columns == 3: - return 'three columns' - elif columns == 4: - return 'four columns' - elif columns == 5: - return 'five columns' - elif columns == 6: - return 'six columns' - elif columns == 7: - return 'seven columns' - elif columns == 8: - return 'eight columns' - elif columns == 9: - return 'nine columns' - elif columns == 10: - return 'ten columns' - elif columns == 11: - return 'eleven columns' - elif columns == 12: - return 'twelve columns' - else: - raise RuntimeError('Constraint violation: should be 1 <= columns <= 12, ' + - 'got columns={}'.format(columns)) - - @staticmethod - def search_directory(directory, *extensions, recursive=False): - files = [] - - if recursive: - for root, subdirs, files in os.walk(directory): - for file in files: - if not extensions or os.path.splitext(file)[1].lower() in extensions: - files.append(os.path.join(root, file)) - else: - for file in os.listdir(directory): - if not extensions or os.path.splitext(file)[1].lower() in extensions: - files.append(os.path.join(directory, file)) - - return files - - @classmethod - def search_web_directory(cls, directory, *extensions): - directory = os.path.abspath(os.path.expanduser(directory)) - resource_dirs = get_backend('http').resource_dirs - resource_path = None - uri = '' - - for name, resource_path in resource_dirs.items(): - if directory.startswith(resource_path): - subdir = re.sub('^{}(.*)$'.format(resource_path), - '\\1', directory) - uri = '/resources/' + name - break - - if not uri: - raise RuntimeError('Directory {} not found among the available ' + - 'static resources on the webserver'.format( - directory)) - - results = [ - re.sub('^{}(.*)$'.format(resource_path), uri + '\\1', path) - for path in cls.search_directory(directory, *extensions) - ] - - return results - - @classmethod - def to_json(cls, data): - return json.dumps(data) - - @classmethod - def from_json(cls, data): - return json.loads(data) - - @classmethod - def get_config(cls, attr): - return Config.get(attr) - - # vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/__init__.py b/platypush/backend/http/app/__init__.py new file mode 100644 index 0000000000..350622e476 --- /dev/null +++ b/platypush/backend/http/app/__init__.py @@ -0,0 +1,24 @@ +import os + +from flask import Flask + +from platypush.backend.http.utils import HttpUtils +from platypush.backend.http.app.utils import get_routes + + +## Webapp initialization + +base_folder = os.path.abspath(os.path.join( + os.path.dirname(os.path.abspath(__file__)), '..')) + +template_folder = os.path.join(base_folder, 'templates') +static_folder = os.path.join(base_folder, 'static') + +app = Flask('platypush', template_folder=template_folder, + static_folder=static_folder) + +for route in get_routes(): + app.register_blueprint(route) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/routes/__init__.py b/platypush/backend/http/app/routes/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/platypush/backend/http/app/routes/dashboard.py b/platypush/backend/http/app/routes/dashboard.py new file mode 100644 index 0000000000..e129c79cf9 --- /dev/null +++ b/platypush/backend/http/app/routes/dashboard.py @@ -0,0 +1,30 @@ +from flask import Blueprint, request, render_template + +from platypush.backend.http.app import template_folder +from platypush.backend.http.app.utils import authenticate, authentication_ok, \ + get_websocket_port + +from platypush.backend.http.utils import HttpUtils +from platypush.config import Config + +dashboard = Blueprint('dashboard', __name__, template_folder=template_folder) + +# Declare routes list +__routes__ = [ + dashboard, +] + +@dashboard.route('/dashboard', methods=['GET']) +def dashboard(): + """ Route for the fullscreen dashboard """ + if not authentication_ok(request): return authenticate() + + http_conf = Config.get('backend.http') + dashboard_conf = http_conf.get('dashboard', {}) + + return render_template('dashboard.html', config=dashboard_conf, + utils=HttpUtils, token=Config.get('token'), + websocket_port=get_websocket_port()) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/routes/execute.py b/platypush/backend/http/app/routes/execute.py new file mode 100644 index 0000000000..ed15b1c1ed --- /dev/null +++ b/platypush/backend/http/app/routes/execute.py @@ -0,0 +1,36 @@ +import json + +from flask import Blueprint, abort, request, Response + +from platypush.backend.http.app import template_folder +from platypush.backend.http.app.utils import authenticate, authentication_ok, \ + logger, send_message + +from platypush.backend.http.utils import HttpUtils + + +execute = Blueprint('execute', __name__, template_folder=template_folder) + +# Declare routes list +__routes__ = [ + execute, +] + +@execute.route('/execute', methods=['POST']) +def execute(): + """ Endpoint to execute commands """ + if not authentication_ok(request): return authenticate() + + msg = json.loads(request.data.decode('utf-8')) + logger().info('Received message on the HTTP backend: {}'.format(msg)) + + try: + response = send_message(msg) + return Response(str(response or {}), mimetype='application/json') + except Exception as e: + logger().error('Error while running HTTP action: {}. Request: {}'. + format(str(e), msg)) + abort(500, str(e)) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/routes/index.py b/platypush/backend/http/app/routes/index.py new file mode 100644 index 0000000000..86d6a1fb42 --- /dev/null +++ b/platypush/backend/http/app/routes/index.py @@ -0,0 +1,52 @@ +import os + +from flask import Blueprint, request, render_template + +from platypush.backend.http.app import template_folder +from platypush.backend.http.app.utils import authenticate, authentication_ok, \ + get_websocket_port + +from platypush.backend.http.utils import HttpUtils +from platypush.config import Config + + +index = Blueprint('index', __name__, template_folder=template_folder) + +# Declare routes list +__routes__ = [ + index, +] + +@index.route('/') +def index(): + """ Route to the main web panel """ + if not authentication_ok(request): return authenticate() + + # These plugins have their own template file but won't be shown as a tab in + # the web panel. This is usually the case for plugins that only include JS + # code but no template content. + _hidden_plugins = { + 'assistant.google' + } + + configured_plugins = Config.get_plugins() + enabled_plugins = {} + hidden_plugins = {} + + for plugin, conf in configured_plugins.items(): + template_file = os.path.join('plugins', plugin + '.html') + if os.path.isfile(os.path.join(template_folder, template_file)): + if plugin in _hidden_plugins: + hidden_plugins[plugin] = conf + else: + enabled_plugins[plugin] = conf + + http_conf = Config.get('backend.http') + return render_template('index.html', plugins=enabled_plugins, + hidden_plugins=hidden_plugins, utils=HttpUtils, + token=Config.get('token'), + websocket_port=get_websocket_port(), + has_ssl=http_conf.get('ssl_cert') is not None) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/routes/map.py b/platypush/backend/http/app/routes/map.py new file mode 100644 index 0000000000..d6e10ca7de --- /dev/null +++ b/platypush/backend/http/app/routes/map.py @@ -0,0 +1,95 @@ +import datetime +import dateutil.parser + +from flask import abort, request, render_template, Blueprint + +from platypush.backend.http.app import template_folder +from platypush.backend.http.app.utils import authenticate, authentication_ok +from platypush.config import Config + + +map_ = Blueprint('map', __name__, template_folder=template_folder) + +# Declare routes list +__routes__ = [ + map_, +] + +def parse_time(time_string): + if not time_string: + return None + + now = datetime.datetime.now() + + if time_string == 'now': + return now.isoformat() + if time_string == 'yesterday': + return (now - datetime.timedelta(days=1)).isoformat() + + try: + return dateutil.parser.parse(time_string).isoformat() + except ValueError: + pass + + m = re.match('([-+]?)([0-9]+)([dhms])', time_string) + if not m: + raise RuntimeError('Invalid time interval string representation: "{}"'. + format(time_string)) + + time_delta = (-1 if m.group(1) == '-' else 1) * int(m.group(2)) + time_unit = m.group(3) + + if time_unit == 'd': + params = { 'days': time_delta } + elif time_unit == 'h': + params = { 'hours': time_delta } + elif time_unit == 'm': + params = { 'minutes': time_delta } + elif time_unit == 's': + params = { 'seconds': time_delta } + + return (now + datetime.timedelta(**params)).isoformat() + + +@map_.route('/map', methods=['GET']) +def map(): + """ + Query parameters: + start -- Map timeline start timestamp + end -- Map timeline end timestamp + zoom -- Between 1-20. Set it if you want to override the + Google's API auto-zoom. You may have to set it if you are + trying to embed the map into an iframe + + Supported values for `start` and `end`: + - now + - yesterday + - -30s (it means '30 seconds ago') + - -10m (it means '10 minutes ago') + - -24h (it means '24 hours ago') + - -7d (it means '7 days ago') + - 2018-06-04T17:39:22.742Z (ISO strings) + + Default: start=yesterday, end=now + """ + + if not authentication_ok(request): return authenticate() + map_conf = (Config.get('backend.http') or {}).get('maps', {}) + if not map_conf: + abort(500, 'The maps plugin is not configured in backend.http') + + api_key = map_conf.get('api_key') + if not api_key: + abort(500, 'Google Maps api_key not set in the maps configuration') + + start = parse_time(request.args.get('start', default='yesterday')) + end = parse_time(request.args.get('end', default='now')) + zoom = request.args.get('zoom', default=None) + + return render_template('map.html', config=map_conf, utils=HttpUtils, + start=start, end=end, zoom=zoom, + token=Config.get('token'), api_key=api_key, + websocket_port=get_websocket_port()) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/routes/plugins/media/__init__.py b/platypush/backend/http/app/routes/plugins/media/__init__.py new file mode 100644 index 0000000000..c41f5ea490 --- /dev/null +++ b/platypush/backend/http/app/routes/plugins/media/__init__.py @@ -0,0 +1,217 @@ +import hashlib +import json +import threading + +from flask import Response, render_template + +from platypush.backend.http.app.utils import get_remote_base_url, logger, \ + send_message + +from platypush.backend.http.media.handlers import MediaHandler + +media_map = {} +media_map_lock = threading.RLock() + +# Size for the bytes chunk sent over the media streaming infra +STREAMING_CHUNK_SIZE = 4096 + +# Maximum range size to be sent through the media streamer if Range header +# is not set +STREAMING_BLOCK_SIZE = 3145728 + + +def get_media_url(media_id): + return '{url}/media/{media_id}'.format( + url=get_remote_base_url(), media_id=media_id) + +def get_media_id(source): + return hashlib.sha1(source.encode()).hexdigest() + +def register_media(source, subtitles=None): + global media_map, media_map_lock + + media_id = get_media_id(source) + media_url = get_media_url(media_id) + + with media_map_lock: + if media_id in media_map: + return media_map[media_id] + + subfile = None + if subtitles: + req = { + 'type': 'request', + 'action': 'media.subtitles.download', + 'args': { + 'link': subtitles, + 'convert_to_vtt': True, + } + } + + try: + subfile = (send_message(req).output or {}).get('filename') + except Exception as e: + logger().warning('Unable to load subtitle {}: {}' + .format(subtitles, str(e))) + + with media_map_lock: + media_hndl = MediaHandler.build(source, url=media_url, subtitles=subfile) + media_map[media_id] = media_hndl + media_hndl.media_id = media_id + + logger().info('Streaming "{}" on {}'.format(source, media_url)) + return media_hndl + + +def unregister_media(source): + global media_map, media_map_lock + + if source is None: + raise KeyError('No media_id specified') + + media_id = get_media_id(source) + media_info = {} + + with media_map_lock: + if media_id not in media_map: + raise FileNotFoundError('{} is not a registered media_id'. + format(source)) + media_info = media_map.pop(media_id) + + logger().info('Unregistered {} from {}'.format(source, media_info.get('url'))) + return media_info + + +def stream_media(media_id, req): + global STREAMING_BLOCK_SIZE, STREAMING_CHUNK_SIZE + + media_hndl = media_map.get(media_id) + if not media_hndl: + raise FileNotFoundError('{} is not a registered media_id'.format(media_id)) + + from_bytes = None + to_bytes = None + range_hdr = req.headers.get('range') + content_length = media_hndl.content_length + status_code = 200 + + headers = { + 'Accept-Ranges': 'bytes', + 'Content-Type': media_hndl.mime_type, + } + + if 'download' in req.args: + headers['Content-Disposition'] = 'attachment' + \ + ('; filename="{}"'.format(media_hndl.filename) if + media_hndl.filename else '') + + if range_hdr: + headers['Accept-Ranges'] = 'bytes' + from_bytes, to_bytes = range_hdr.replace('bytes=', '').split('-') + from_bytes = int(from_bytes) + + if not to_bytes: + to_bytes = content_length-1 + content_length -= from_bytes + else: + to_bytes = int(to_bytes) + content_length = to_bytes - from_bytes + + status_code = 206 + headers['Content-Range'] = 'bytes {start}-{end}/{size}'.format( + start=from_bytes, end=to_bytes, + size=media_hndl.content_length) + else: + from_bytes = 0 + to_bytes = STREAMING_BLOCK_SIZE + + headers['Content-Length'] = content_length + + if 'webplayer' in req.args: + return render_template('webplayer.html', + media_url=media_hndl.url.replace( + get_remote_base_url(), ''), + media_type=media_hndl.mime_type, + subtitles_url='/media/subtitles/{}.vtt'. + format(media_id) if media_hndl.subtitles + else None) + else: + return Response(media_hndl.get_data( + from_bytes=from_bytes, to_bytes=to_bytes, + chunk_size=STREAMING_CHUNK_SIZE), + status_code, headers=headers, mimetype=headers['Content-Type'], + direct_passthrough=True) + + +def add_subtitles(media_id, req): + """ + This route can be used to download and/or expose subtitles files + associated to a media file + """ + + media_hndl = media_map.get(media_id) + if not media_hndl: + raise FileNotFoundError('{} is not a registered media_id'.format(media_id)) + + subfile = None + if req.data: + subfile = json.loads(req.data.decode('utf-8')).get('filename') + if not subfile: + raise AttributeError('No filename specified in the request') + + if not subfile: + if not media_hndl.path: + raise NotImplementedError( + 'Subtitles are currently only supported for local media files') + + req = { + 'type': 'request', + 'action': 'media.subtitles.get_subtitles', + 'args': { + 'resource': media_hndl.path, + } + } + + try: + subtitles = send_message(req).output or [] + except Exception as e: + raise RuntimeError('Could not get subtitles: {}'.format(str(e))) + + if not subtitles: + raise FileNotFoundError('No subtitles found for resource {}'. + format(media_hndl.path)) + + req = { + 'type': 'request', + 'action': 'media.subtitles.download', + 'args': { + 'link': subtitles[0].get('SubDownloadLink'), + 'media_resource': media_hndl.path, + 'convert_to_vtt': True, + } + } + + subfile = (send_message(req).output or {}).get('filename') + + media_hndl.set_subtitles(subfile) + return { + 'filename': subfile, + 'url': get_remote_base_url() + '/media/subtitles/' + media_id + '.vtt', + } + +def remove_subtitles(media_id): + media_hndl = media_map.get(media_id) + if not media_hndl: + raise FileNotFoundError('{} is not a registered media_id'. + format(media_id)) + + if not media_hndl.subtitles: + raise FileNotFoundError('{} has no subtitles attached'. + format(media_id)) + + media_hndl.remove_subtitles() + return {} + + + +# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/routes/plugins/media/stream.py b/platypush/backend/http/app/routes/plugins/media/stream.py new file mode 100644 index 0000000000..e71f6f8868 --- /dev/null +++ b/platypush/backend/http/app/routes/plugins/media/stream.py @@ -0,0 +1,86 @@ +import json + +from flask import abort, jsonify, request, Blueprint + +from platypush.backend.http.app import template_folder +from platypush.backend.http.app.utils import logger, get_remote_base_url +from platypush.backend.http.app.routes.plugins.media import media_map, \ + stream_media, register_media, unregister_media + +media = Blueprint('media', __name__, template_folder=template_folder) + +# Declare routes list +__routes__ = [ + media, +] + +@media.route('/media', methods=['GET']) +def get_media(): + """ + This route can be used to get the list of registered streams + """ + return jsonify([dict(media) for media in media_map.values()]) + + +@media.route('/media', methods=['PUT']) +def add_media(): + """ + This route can be used by the `media` plugin to add streaming content over HTTP + """ + + args = {} + try: + args = json.loads(request.data.decode('utf-8')) + except: + abort(400, 'Invalid JSON request') + + source = args.get('source') + if not source: + abort(400, 'The request does not contain any source') + + subtitles = args.get('subtitles') + try: + media_hndl = register_media(source, subtitles) + ret = dict(media_hndl) + if media_hndl.subtitles: + ret['subtitles_url'] = get_remote_base_url() + \ + '/media/subtitles/' + media_hndl.media_id + '.vtt' + return jsonify(ret) + except FileNotFoundError as e: + abort(404, str(e)) + except AttributeError as e: + abort(400, str(e)) + except Exception as e: + logger().exception(e) + abort(500, str(e)) + + +@media.route('/media/', methods=['GET', 'DELETE']) +def stream_or_delete_media(media_id): + """ + This route can be used to stream active media points or unregister + a mounted media stream + """ + + # Remove the extension + media_id = '.'.join(media_id.split('.')[:-1]) + + try: + if request.method == 'GET': + if media_id is None: + return jsonify([dict(media) for media in media_map.values()]) + else: + return stream_media(media_id, request) + else: + media_info = unregister_media(media_id) + return jsonify(media_info) + except (AttributeError, FileNotFoundError) as e: + abort(404, str(e)) + except KeyError as e: + abort(400, str(e)) + except Exception as e: + logger().exception(e) + abort(500, str(e)) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/routes/plugins/media/subtitles.py b/platypush/backend/http/app/routes/plugins/media/subtitles.py new file mode 100644 index 0000000000..3a6fff0107 --- /dev/null +++ b/platypush/backend/http/app/routes/plugins/media/subtitles.py @@ -0,0 +1,51 @@ +import os + +from flask import abort, jsonify, request, send_from_directory, Blueprint + +from platypush.backend.http.app import template_folder +from platypush.backend.http.app.routes.plugins.media import media_map, \ + remove_subtitles, add_subtitles + +subtitles = Blueprint('subtitles', __name__, template_folder=template_folder) + +# Declare routes list +__routes__ = [ + subtitles, +] + +@subtitles.route('/media/subtitles/.vtt', methods=['GET', 'POST', 'DELETE']) +def handle_subtitles(media_id): + """ + This route can be used to download and/or expose subtitle files + associated to a media file + """ + + if request.method == 'GET': + media_hndl = media_map.get(media_id) + if not media_hndl: + abort(404, 'No such media') + + if not media_hndl.subtitles: + abort(404, 'The media has no subtitles attached') + + return send_from_directory( + os.path.dirname(media_hndl.subtitles), + os.path.basename(media_hndl.subtitles), + mimetype='text/vtt') + + try: + if request.method == 'DELETE': + return jsonify(remove_subtitles(media_id)) + else: + return jsonify(add_subtitles(media_id, request)) + except FileNotFoundError as e: + abort(404, str(e)) + except AttributeError as e: + abort(400, str(e)) + except NotImplementedError as e: + abort(422, str(e)) + except Exception as e: + abort(500, str(e)) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/routes/resources.py b/platypush/backend/http/app/routes/resources.py new file mode 100644 index 0000000000..8b04ddd2c4 --- /dev/null +++ b/platypush/backend/http/app/routes/resources.py @@ -0,0 +1,55 @@ +import os +import re + +from flask import Blueprint, abort, send_from_directory + +from platypush.config import Config +from platypush.backend.http.app import template_folder +from platypush.backend.http.app.utils import authenticate, authentication_ok, \ + send_message + + +resources = Blueprint('resources', __name__, template_folder=template_folder) + +# Declare routes list +__routes__ = [ + resources, +] + +@resources.route('/resources/', methods=['GET']) +def resources_path(path): + """ Custom static resources """ + path_tokens = path.split('/') + filename = path_tokens.pop(-1) + http_conf = Config.get('backend.http') + resource_dirs = http_conf.get('resource_dirs', {}) + + while path_tokens: + if '/'.join(path_tokens) in resource_dirs: + break + path_tokens.pop() + + if not path_tokens: + # Requested resource not found in the allowed resource_dirs + abort(404) + + base_path = '/'.join(path_tokens) + real_base_path = os.path.abspath(os.path.expanduser(resource_dirs[base_path])) + real_path = real_base_path + + file_path = [s for s in re.sub(r'^{}(.*)$'.format(base_path), '\\1', path) + .split('/') if s] + + for p in file_path[:-1]: + real_path += os.sep + p + file_path.pop(0) + + file_path = file_path.pop(0) + if not real_path.startswith(real_base_path): + # Directory climbing attempt + abort(404) + + return send_from_directory(real_path, file_path) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/utils.py b/platypush/backend/http/app/utils.py new file mode 100644 index 0000000000..e4f4b6d3f3 --- /dev/null +++ b/platypush/backend/http/app/utils.py @@ -0,0 +1,155 @@ +import importlib +import json +import logging +import os +import sys + +from flask import Response +from redis import Redis + +# NOTE: The HTTP service will *only* work on top of a Redis bus. The default +# internal bus service won't work as the web server will run in a different process. +from platypush.bus.redis import RedisBus + +from platypush.config import Config +from platypush.message import Message +from platypush.message.request import Request +from platypush.utils import get_redis_queue_name_by_message, get_ip_or_hostname + +_bus = None +_logger = None + + +def bus(): + global _bus + if _bus is None: + _bus = RedisBus() + return _bus + +def logger(): + global _logger + if not _logger: + log_args = { + 'level': logging.INFO, + 'format': '%(asctime)-15s|%(levelname)5s|%(name)s|%(message)s', + } + + level = (Config.get('backend.http') or {}).get('logging') or \ + (Config.get('logging') or {}).get('level') + filename = (Config.get('backend.http') or {}).get('filename') + + if level: + log_args['level'] = getattr(logging, level.upper()) \ + if isinstance(level, str) else level + if filename: + log_args['filename'] = filename + + logging.basicConfig(**log_args) + _logger = logging.getLogger('platyweb') + + return _logger + +def get_message_response(msg): + redis = Redis(**bus().redis_args) + response = redis.blpop(get_redis_queue_name_by_message(msg), timeout=60) + if response and len(response) > 1: + response = Message.build(response[1]) + else: + response = None + + return response + +def get_http_port(): + from platypush.backend.http import HttpBackend + http_conf = Config.get('backend.http') + return http_conf.get('port', HttpBackend._DEFAULT_HTTP_PORT) + +def get_websocket_port(): + from platypush.backend.http import HttpBackend + http_conf = Config.get('backend.http') + return http_conf.get('websocket_port', HttpBackend._DEFAULT_WEBSOCKET_PORT) + +def send_message(msg): + msg = Message.build(msg) + + if isinstance(msg, Request): + msg.origin = 'http' + + if Config.get('token'): + msg.token = Config.get('token') + + bus().post(msg) + + if isinstance(msg, Request): + response = get_message_response(msg) + logger().info('Processing response on the HTTP backend: {}'. + format(response)) + + return response + + +def authenticate(): + return Response('Authentication required', 401, + {'WWW-Authenticate': 'Basic realm="Login required"'}) + +def authentication_ok(req): + token = Config.get('token') + if not token: + return True + + user_token = None + + # Check if + if 'X-Token' in req.headers: + user_token = req.headers['X-Token'] + elif req.authorization: + # TODO support for user check + user_token = req.authorization.password + elif 'token' in req.args: + user_token = req.args.get('token') + else: + try: + args = json.loads(req.data.decode('utf-8')) + user_token = args.get('token') + except: + pass + + if user_token == token: + return True + + return False + +def get_routes(): + routes_dir = os.path.join( + os.path.dirname(os.path.abspath(__file__)), 'routes') + routes = [] + + for path, dirs, files in os.walk(routes_dir): + for f in files: + if f.endswith('.py'): + sys.path.insert(0, path) + try: + mod = importlib.import_module('.'.join(f.split('.')[:-1])) + if hasattr(mod, '__routes__'): + routes.extend(mod.__routes__) + except Exception as e: + logger().warning('Could not import routes from {}/{}: {}: {}'. + format(path, f, type(e), str(e))) + + return routes + + +def get_local_base_url(): + http_conf = Config.get('backend.http') or {} + return '{proto}://localhost:{port}'.format( + proto=('https' if http_conf.get('ssl_cert') else 'http'), + port=get_http_port()) + +def get_remote_base_url(): + http_conf = Config.get('backend.http') or {} + return '{proto}://{host}:{port}'.format( + proto=('https' if http_conf.get('ssl_cert') else 'http'), + host=get_ip_or_hostname(), port=get_http_port()) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/utils.py b/platypush/backend/http/utils.py new file mode 100644 index 0000000000..da09a1894f --- /dev/null +++ b/platypush/backend/http/utils.py @@ -0,0 +1,97 @@ +import json +import os +import re + +from platypush.config import Config + +class HttpUtils(object): + @staticmethod + def widget_columns_to_html_class(columns): + if not isinstance(columns, int): + raise RuntimeError('columns should be a number, got "{}"'.format(columns)) + + if columns == 1: + return 'one column' + elif columns == 2: + return 'two columns' + elif columns == 3: + return 'three columns' + elif columns == 4: + return 'four columns' + elif columns == 5: + return 'five columns' + elif columns == 6: + return 'six columns' + elif columns == 7: + return 'seven columns' + elif columns == 8: + return 'eight columns' + elif columns == 9: + return 'nine columns' + elif columns == 10: + return 'ten columns' + elif columns == 11: + return 'eleven columns' + elif columns == 12: + return 'twelve columns' + else: + raise RuntimeError('Constraint violation: should be 1 <= columns <= 12, ' + + 'got columns={}'.format(columns)) + + @staticmethod + def search_directory(directory, *extensions, recursive=False): + files = [] + + if recursive: + for root, subdirs, files in os.walk(directory): + for file in files: + if not extensions or os.path.splitext(file)[1].lower() in extensions: + files.append(os.path.join(root, file)) + else: + for file in os.listdir(directory): + if not extensions or os.path.splitext(file)[1].lower() in extensions: + files.append(os.path.join(directory, file)) + + return files + + @classmethod + def search_web_directory(cls, directory, *extensions): + directory = os.path.abspath(os.path.expanduser(directory)) + resource_dirs = (Config.get('backend.http') or {}).get('resource_dirs', {}) + resource_path = None + uri = '' + + for name, resource_path in resource_dirs.items(): + resource_path = os.path.abspath(os.path.expanduser(resource_path)) + if directory.startswith(resource_path): + subdir = re.sub('^{}(.*)$'.format(resource_path), + '\\1', directory) + uri = '/resources/' + name + break + + if not uri: + raise RuntimeError(('Directory {} not found among the available ' + + 'static resources on the webserver').format( + directory)) + + results = [ + re.sub('^{}(.*)$'.format(resource_path), uri + '\\1', path) + for path in cls.search_directory(directory, *extensions) + ] + + return results + + @classmethod + def to_json(cls, data): + return json.dumps(data) + + @classmethod + def from_json(cls, data): + return json.loads(data) + + @classmethod + def get_config(cls, attr): + return Config.get(attr) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/backend/websocket.py b/platypush/backend/websocket.py index caddeda05e..718d3f6a30 100644 --- a/platypush/backend/websocket.py +++ b/platypush/backend/websocket.py @@ -8,6 +8,7 @@ from platypush.backend import Backend from platypush.context import get_plugin, get_or_create_event_loop from platypush.message import Message from platypush.message.request import Request +from platypush.message.response import Response from platypush.utils import get_ssl_server_context @@ -23,8 +24,11 @@ class WebsocketBackend(Backend): # Websocket client message recv timeout in seconds _websocket_client_timeout = 0 - def __init__(self, port=8765, bind_address='0.0.0.0', ssl_cafile=None, - ssl_capath=None, ssl_cert=None, ssl_key=None, + # Default websocket service port + _default_websocket_port = 8765 + + def __init__(self, port=_default_websocket_port, bind_address='0.0.0.0', + ssl_cafile=None, ssl_capath=None, ssl_cert=None, ssl_key=None, client_timeout=_websocket_client_timeout, **kwargs): """ :param port: Listen port for the websocket server (default: 8765) @@ -116,26 +120,22 @@ class WebsocketBackend(Backend): self.on_message(msg) if isinstance(msg, Request): - response = self.get_message_response(msg) - if not response: - return - + response = self.get_message_response(msg) or Response() self.logger.info('Processing response on the websocket backend: {}'. - format(response)) + format(response)) await websocket.send(str(response)) + except websockets.exceptions.ConnectionClosed as e: + self.active_websockets.remove(websocket) + self.logger.debug('Websocket client {} closed connection'. + format(websocket.remote_address[0])) + except asyncio.TimeoutError as e: + self.active_websockets.remove(websocket) + self.logger.debug('Websocket connection to {} timed out'. + format(websocket.remote_address[0])) except Exception as e: - if isinstance(e, websockets.exceptions.ConnectionClosed): - self.active_websockets.remove(websocket) - self.logger.debug('Websocket client {} closed connection'. - format(websocket.remote_address[0])) - elif isinstance(e, asyncio.TimeoutError): - self.active_websockets.remove(websocket) - self.logger.debug('Websocket connection to {} timed out'. - format(websocket.remote_address[0])) - else: - self.logger.exception(e) + self.logger.exception(e) self.logger.info('Initialized websocket backend on port {}, bind address: {}'. format(self.port, self.bind_address)) diff --git a/platypush/bus/redis.py b/platypush/bus/redis.py index 661f7fbc9f..3357328368 100644 --- a/platypush/bus/redis.py +++ b/platypush/bus/redis.py @@ -5,6 +5,7 @@ import threading from redis import Redis from platypush.bus import Bus +from platypush.config import Config from platypush.message import Message logger = logging.getLogger(__name__) @@ -17,7 +18,12 @@ class RedisBus(Bus): def __init__(self, on_message=None, redis_queue=_DEFAULT_REDIS_QUEUE, *args, **kwargs): super().__init__(on_message=on_message) + + if not args and not kwargs: + kwargs = (Config.get('backend.redis') or {}).get('redis_args', {}) + self.redis = Redis(*args, **kwargs) + self.redis_args = kwargs self.redis_queue = redis_queue self.on_message = on_message self.thread_id = threading.get_ident() diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 44dbeb8511..09a865d1c8 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -217,9 +217,7 @@ class Request(Message): # Retry mechanism plugin.logger.exception(e) logger.warning(('Uncaught exception while processing response ' + - 'from action [{}] from plugin {}: {}').format( - self.action, plugin.__class__.__name__, - str(e))) + 'from action [{}]: {}').format(self.action, str(e))) errors = errors or [] if str(e) not in errors: