diff --git a/platypush/backend/http/app/routes/plugins/media/__init__.py b/platypush/backend/http/app/routes/plugins/media/__init__.py deleted file mode 100644 index 67d82ca611..0000000000 --- a/platypush/backend/http/app/routes/plugins/media/__init__.py +++ /dev/null @@ -1,207 +0,0 @@ -import hashlib -import json -import threading - -from flask import Response - -from platypush.backend.http.app.utils import get_remote_base_url, logger, \ - send_message - -from platypush.backend.http.media.handlers import MediaHandler - -media_map = {} -media_map_lock = threading.RLock() - -# Size for the bytes chunk sent over the media streaming infra -STREAMING_CHUNK_SIZE = 4096 - -# Maximum range size to be sent through the media streamer if Range header -# is not set -STREAMING_BLOCK_SIZE = 3145728 - - -def get_media_url(media_id): - return '{url}/media/{media_id}'.format( - url=get_remote_base_url(), media_id=media_id) - - -def get_media_id(source): - return hashlib.sha1(source.encode()).hexdigest() - - -def register_media(source, subtitles=None): - global media_map, media_map_lock - - media_id = get_media_id(source) - media_url = get_media_url(media_id) - - with media_map_lock: - if media_id in media_map: - return media_map[media_id] - - subfile = None - if subtitles: - req = { - 'type': 'request', - 'action': 'media.subtitles.download', - 'args': { - 'link': subtitles, - 'convert_to_vtt': True, - } - } - - try: - subfile = (send_message(req).output or {}).get('filename') - except Exception as e: - logger().warning('Unable to load subtitle {}: {}' - .format(subtitles, str(e))) - - with media_map_lock: - media_hndl = MediaHandler.build(source, url=media_url, subtitles=subfile) - media_map[media_id] = media_hndl - media_hndl.media_id = media_id - - logger().info('Streaming "{}" on {}'.format(source, media_url)) - return media_hndl - - -def unregister_media(source): - global media_map, media_map_lock - - if source is None: - raise KeyError('No media_id specified') - - media_id = get_media_id(source) - media_info = {} - - with media_map_lock: - if media_id not in media_map: - raise FileNotFoundError('{} is not a registered media_id'. - format(source)) - media_info = media_map.pop(media_id) - - logger().info('Unregistered {} from {}'.format(source, media_info.get('url'))) - return media_info - - -def stream_media(media_id, req): - global STREAMING_BLOCK_SIZE, STREAMING_CHUNK_SIZE - - media_hndl = media_map.get(media_id) - if not media_hndl: - raise FileNotFoundError('{} is not a registered media_id'.format(media_id)) - - range_hdr = req.headers.get('range') - content_length = media_hndl.content_length - status_code = 200 - - headers = { - 'Accept-Ranges': 'bytes', - 'Content-Type': media_hndl.mime_type, - } - - if 'download' in req.args: - headers['Content-Disposition'] = 'attachment' + \ - ('; filename="{}"'.format(media_hndl.filename) if - media_hndl.filename else '') - - if range_hdr: - headers['Accept-Ranges'] = 'bytes' - from_bytes, to_bytes = range_hdr.replace('bytes=', '').split('-') - from_bytes = int(from_bytes) - - if not to_bytes: - to_bytes = content_length - 1 - content_length -= from_bytes - else: - to_bytes = int(to_bytes) - content_length = to_bytes - from_bytes - - status_code = 206 - headers['Content-Range'] = 'bytes {start}-{end}/{size}'.format( - start=from_bytes, end=to_bytes, - size=media_hndl.content_length) - else: - from_bytes = 0 - to_bytes = STREAMING_BLOCK_SIZE - - headers['Content-Length'] = content_length - - return Response(media_hndl.get_data( - from_bytes=from_bytes, to_bytes=to_bytes, - chunk_size=STREAMING_CHUNK_SIZE), - status_code, headers=headers, mimetype=headers['Content-Type'], - direct_passthrough=True) - - -def add_subtitles(media_id, req): - """ - This route can be used to download and/or expose subtitles files - associated to a media file - """ - - media_hndl = media_map.get(media_id) - if not media_hndl: - raise FileNotFoundError('{} is not a registered media_id'.format(media_id)) - - subfile = None - if req.data: - subfile = json.loads(req.data.decode('utf-8')).get('filename') - if not subfile: - raise AttributeError('No filename specified in the request') - - if not subfile: - if not media_hndl.path: - raise NotImplementedError( - 'Subtitles are currently only supported for local media files') - - req = { - 'type': 'request', - 'action': 'media.subtitles.get_subtitles', - 'args': { - 'resource': media_hndl.path, - } - } - - try: - subtitles = send_message(req).output or [] - except Exception as e: - raise RuntimeError('Could not get subtitles: {}'.format(str(e))) - - if not subtitles: - raise FileNotFoundError('No subtitles found for resource {}'. - format(media_hndl.path)) - - req = { - 'type': 'request', - 'action': 'media.subtitles.download', - 'args': { - 'link': subtitles[0].get('SubDownloadLink'), - 'media_resource': media_hndl.path, - 'convert_to_vtt': True, - } - } - - subfile = (send_message(req).output or {}).get('filename') - - media_hndl.set_subtitles(subfile) - return { - 'filename': subfile, - 'url': get_remote_base_url() + '/media/subtitles/' + media_id + '.vtt', - } - - -def remove_subtitles(media_id): - media_hndl = media_map.get(media_id) - if not media_hndl: - raise FileNotFoundError('{} is not a registered media_id'. - format(media_id)) - - if not media_hndl.subtitles: - raise FileNotFoundError('{} has no subtitles attached'. - format(media_id)) - - media_hndl.remove_subtitles() - return {} - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/routes/plugins/media/stream.py b/platypush/backend/http/app/routes/plugins/media/stream.py deleted file mode 100644 index 38f3081011..0000000000 --- a/platypush/backend/http/app/routes/plugins/media/stream.py +++ /dev/null @@ -1,87 +0,0 @@ -import json - -from flask import abort, jsonify, request, Blueprint - -from platypush.backend.http.app import template_folder -from platypush.backend.http.app.utils import logger, get_remote_base_url -from platypush.backend.http.app.routes.plugins.media import media_map, \ - stream_media, register_media, unregister_media - -media = Blueprint('media', __name__, template_folder=template_folder) - -# Declare routes list -__routes__ = [ - media, -] - - -@media.route('/media', methods=['GET']) -def get_media(): - """ - This route can be used to get the list of registered streams - """ - return jsonify([dict(media) for media in media_map.values()]) - - -@media.route('/media', methods=['PUT']) -def add_media(): - """ - This route can be used by the `media` plugin to add streaming content over HTTP - """ - - args = {} - try: - args = json.loads(request.data.decode('utf-8')) - except Exception as e: - abort(400, 'Invalid JSON request: {}'.format(str(e))) - - source = args.get('source') - if not source: - abort(400, 'The request does not contain any source') - - subtitles = args.get('subtitles') - try: - media_hndl = register_media(source, subtitles) - ret = dict(media_hndl) - if media_hndl.subtitles: - ret['subtitles_url'] = get_remote_base_url() + \ - '/media/subtitles/' + media_hndl.media_id + '.vtt' - return jsonify(ret) - except FileNotFoundError as e: - abort(404, str(e)) - except AttributeError as e: - abort(400, str(e)) - except Exception as e: - logger().exception(e) - abort(500, str(e)) - - -@media.route('/media/', methods=['GET', 'DELETE']) -def stream_or_delete_media(media_id): - """ - This route can be used to stream active media points or unregister - a mounted media stream - """ - - # Remove the extension - media_id = '.'.join(media_id.split('.')[:-1]) - - try: - if request.method == 'GET': - if media_id is None: - return jsonify([dict(media) for media in media_map.values()]) - else: - return stream_media(media_id, request) - else: - media_info = unregister_media(media_id) - return jsonify(media_info) - except (AttributeError, FileNotFoundError) as e: - abort(404, str(e)) - except KeyError as e: - abort(400, str(e)) - except Exception as e: - logger().exception(e) - abort(500, str(e)) - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/routes/plugins/media/subtitles.py b/platypush/backend/http/app/routes/plugins/media/subtitles.py deleted file mode 100644 index 3a6fff0107..0000000000 --- a/platypush/backend/http/app/routes/plugins/media/subtitles.py +++ /dev/null @@ -1,51 +0,0 @@ -import os - -from flask import abort, jsonify, request, send_from_directory, Blueprint - -from platypush.backend.http.app import template_folder -from platypush.backend.http.app.routes.plugins.media import media_map, \ - remove_subtitles, add_subtitles - -subtitles = Blueprint('subtitles', __name__, template_folder=template_folder) - -# Declare routes list -__routes__ = [ - subtitles, -] - -@subtitles.route('/media/subtitles/.vtt', methods=['GET', 'POST', 'DELETE']) -def handle_subtitles(media_id): - """ - This route can be used to download and/or expose subtitle files - associated to a media file - """ - - if request.method == 'GET': - media_hndl = media_map.get(media_id) - if not media_hndl: - abort(404, 'No such media') - - if not media_hndl.subtitles: - abort(404, 'The media has no subtitles attached') - - return send_from_directory( - os.path.dirname(media_hndl.subtitles), - os.path.basename(media_hndl.subtitles), - mimetype='text/vtt') - - try: - if request.method == 'DELETE': - return jsonify(remove_subtitles(media_id)) - else: - return jsonify(add_subtitles(media_id, request)) - except FileNotFoundError as e: - abort(404, str(e)) - except AttributeError as e: - abort(400, str(e)) - except NotImplementedError as e: - abort(422, str(e)) - except Exception as e: - abort(500, str(e)) - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/streaming/plugins/media/__init__.py b/platypush/backend/http/app/streaming/plugins/media/__init__.py new file mode 100644 index 0000000000..62689684f2 --- /dev/null +++ b/platypush/backend/http/app/streaming/plugins/media/__init__.py @@ -0,0 +1,5 @@ +from ._stream import MediaStreamRoute +from ._subtitles import MediaSubtitlesRoute + + +__all__ = ["MediaStreamRoute", "MediaSubtitlesRoute"] diff --git a/platypush/backend/http/app/streaming/plugins/media/_constants.py b/platypush/backend/http/app/streaming/plugins/media/_constants.py new file mode 100644 index 0000000000..0ed0dbe1ef --- /dev/null +++ b/platypush/backend/http/app/streaming/plugins/media/_constants.py @@ -0,0 +1,17 @@ +from typing import Dict + +from platypush.backend.http.media.handlers import MediaHandler + + +# Size for the bytes chunk sent over the media streaming infra +STREAMING_CHUNK_SIZE = 4096 + +# Maximum range size to be sent through the media streamer if Range header +# is not set +STREAMING_BLOCK_SIZE = 3145728 + +# Name of the Redis variable used to store the media map across several +# Web processes +MEDIA_MAP_VAR = 'platypush__stream_media_map' + +MediaMap = Dict[str, MediaHandler] diff --git a/platypush/backend/http/app/streaming/plugins/media/_register.py b/platypush/backend/http/app/streaming/plugins/media/_register.py new file mode 100644 index 0000000000..de8d0fa852 --- /dev/null +++ b/platypush/backend/http/app/streaming/plugins/media/_register.py @@ -0,0 +1,44 @@ +from typing import Optional + +from platypush.backend.http.app.utils import logger, send_request +from platypush.backend.http.media.handlers import MediaHandler + +from ._registry import load_media_map, save_media_map + + +def get_media_url(media_id: str) -> str: + """ + :returns: The URL of a media file given its ID + """ + return f'/media/{media_id}' + + +def register_media(source: str, subtitles: Optional[str] = None) -> MediaHandler: + """ + Registers a media file and returns its associated media handler. + """ + media_id = MediaHandler.get_media_id(source) + media_url = get_media_url(media_id) + media_map = load_media_map() + subfile = None + + if subtitles: + req = { + 'type': 'request', + 'action': 'media.subtitles.download', + 'args': { + 'link': subtitles, + 'convert_to_vtt': True, + }, + } + + try: + subfile = (send_request(req) or {}).get('filename') + except Exception as e: + logger().warning('Unable to load subtitle %s: %s', subtitles, e) + + media_hndl = MediaHandler.build(source, url=media_url, subtitles=subfile) + media_map[media_id] = media_hndl + save_media_map(media_map) + logger().info('Streaming "%s" on %s', source, media_url) + return media_hndl diff --git a/platypush/backend/http/app/streaming/plugins/media/_registry.py b/platypush/backend/http/app/streaming/plugins/media/_registry.py new file mode 100644 index 0000000000..55202e72f1 --- /dev/null +++ b/platypush/backend/http/app/streaming/plugins/media/_registry.py @@ -0,0 +1,40 @@ +import json +import multiprocessing + +from platypush.backend.http.app.utils import logger +from platypush.backend.http.media.handlers import MediaHandler +from platypush.message import Message +from platypush.utils import get_redis + +from ._constants import MEDIA_MAP_VAR, MediaMap + +media_map_lock = multiprocessing.RLock() + + +def load_media_map() -> MediaMap: + """ + Load the media map from the server. + """ + with media_map_lock: + redis = get_redis() + try: + media_map = json.loads( + ((redis.mget(MEDIA_MAP_VAR) or [None])[0] or b'{}').decode() # type: ignore + ) + except Exception as e: + logger().warning('Could not load media map: %s', e) + return {} + + return { + media_id: MediaHandler.build(**media_info) + for media_id, media_info in media_map.items() + } + + +def save_media_map(new_map: MediaMap): + """ + Updates the stored media map on the server. + """ + with media_map_lock: + redis = get_redis() + redis.mset({MEDIA_MAP_VAR: json.dumps(new_map, cls=Message.Encoder)}) diff --git a/platypush/backend/http/app/streaming/plugins/media/_stream.py b/platypush/backend/http/app/streaming/plugins/media/_stream.py new file mode 100644 index 0000000000..1317d4b2c6 --- /dev/null +++ b/platypush/backend/http/app/streaming/plugins/media/_stream.py @@ -0,0 +1,149 @@ +import json +from typing import Optional + +from tornado.web import stream_request_body + +from platypush.backend.http.app.streaming import StreamingRoute + +from ._constants import STREAMING_BLOCK_SIZE, STREAMING_CHUNK_SIZE +from ._register import register_media +from ._registry import load_media_map +from ._unregister import unregister_media + + +@stream_request_body +class MediaStreamRoute(StreamingRoute): + """ + Route for media streams. + """ + + SUPPORTED_METHODS = ['GET', 'PUT', 'DELETE'] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._body = b'' + + @classmethod + def path(cls) -> str: + return r"^/media/?([a-zA-Z0-9_.]+)?$" + + @property + def auth_required(self) -> bool: + return False + + def get(self, media_id: Optional[str] = None): + """ + Streams a media resource by ID. + """ + + # Return the list of registered streaming media resources if no ID is + # specified + if not media_id: + self.get_media() + return + + # Strip the extension + media_id = '.'.join(media_id.split('.')[:-1]) + + try: + self.stream_media(media_id) + except Exception as e: + self._on_error(e) + + def put(self, *_, **__): + """ + The `PUT` route is used to prepare a new media resource for streaming. + """ + try: + self.add_media() + except Exception as e: + self._on_error(e) + + def delete(self, media_id: Optional[str] = None): + """ + Removes the given media_id from the map of streaming media. + """ + media_info = unregister_media(media_id) + self.write(json.dumps(media_info)) + + def data_received(self, chunk: bytes): + self._body += chunk + + def add_media(self): + """ + Adds a new media resource to the map of streaming media. + """ + args = {} + try: + args = json.loads(self._body) + except Exception as e: + raise AssertionError(f'Invalid JSON request: {e}') from e + + source = args.get('source') + assert source, 'The request does not contain any source' + subtitles = args.get('subtitles') + media_hndl = register_media(source, subtitles) + ret = media_hndl.to_json() + if media_hndl.subtitles: + ret['subtitles_url'] = f'/media/subtitles/{media_hndl.media_id}.vtt' + + self.write(json.dumps(ret)) + + def get_media(self): + """ + Returns the list of registered media resources. + """ + self.add_header('Content-Type', 'application/json') + self.finish(json.dumps([dict(media) for media in load_media_map().values()])) + + def stream_media(self, media_id: str): + """ + Route to stream a media file given its ID. + """ + media_hndl = load_media_map().get(media_id) + if not media_hndl: + raise FileNotFoundError(f'{media_id} is not a registered media_id') + + range_hdr = self.request.headers.get('Range') + content_length = media_hndl.content_length + + self.add_header('Accept-Ranges', 'bytes') + self.add_header('Content-Type', media_hndl.mime_type) + + if 'download' in self.request.arguments: + self.add_header( + 'Content-Disposition', + 'attachment' + + ('; filename="{media_hndl.filename}"' if media_hndl.filename else ''), + ) + + if range_hdr: + from_bytes, to_bytes = range_hdr.replace('bytes=', '').split('-') + from_bytes = int(from_bytes) + + if not to_bytes: + to_bytes = content_length - 1 + content_length -= from_bytes + else: + to_bytes = int(to_bytes) + content_length = to_bytes - from_bytes + + self.set_status(206) + self.add_header( + 'Content-Range', + f'bytes {from_bytes}-{to_bytes}/{media_hndl.content_length}', + ) + else: + from_bytes = 0 + to_bytes = STREAMING_BLOCK_SIZE + + self.add_header('Content-Length', str(content_length)) + for chunk in media_hndl.get_data( + from_bytes=from_bytes, + to_bytes=to_bytes, + chunk_size=STREAMING_CHUNK_SIZE, + ): + self.write(chunk) + self.flush() + + self.finish() diff --git a/platypush/backend/http/app/streaming/plugins/media/_subtitles.py b/platypush/backend/http/app/streaming/plugins/media/_subtitles.py new file mode 100644 index 0000000000..3df407b89b --- /dev/null +++ b/platypush/backend/http/app/streaming/plugins/media/_subtitles.py @@ -0,0 +1,140 @@ +import json + +from tornado.web import stream_request_body + +from platypush.backend.http.app.streaming import StreamingRoute +from platypush.backend.http.app.utils.bus import send_request + +from ._registry import load_media_map + + +@stream_request_body +class MediaSubtitlesRoute(StreamingRoute): + """ + Route for media stream subtitles. + """ + + SUPPORTED_METHODS = ['GET', 'POST', 'DELETE'] + + @classmethod + def path(cls) -> str: + return r"^/media/subtitles/([a-zA-Z0-9_.]+)\.vtt$" + + @property + def auth_required(self) -> bool: + return False + + def get(self, media_id: str): + """ + GET route to retrieve the subtitles for the given media_id. + """ + try: + self.get_subtitles(media_id) + except Exception as e: + self._on_error(e) + + def post(self, media_id: str): + """ + POST route to add subtitles to the given media_id. + """ + try: + self.add_subtitles(media_id) + except Exception as e: + self._on_error(e) + + def delete(self, media_id: str): + """ + DELETE route to remove the subtitles for the given media_id. + """ + try: + self.remove_subtitles(media_id) + except Exception as e: + self._on_error(e) + + def get_subtitles(self, media_id: str): + """ + Retrieves the subtitles for the given media_id. + """ + + media_hndl = load_media_map().get(media_id) + if not media_hndl: + raise FileNotFoundError(f'{media_id} is not a registered media_id') + + if not media_hndl.subtitles: + raise FileNotFoundError(f'{media_id} has no subtitles') + + with open(media_hndl.subtitles) as f: + self.set_header('Content-Type', 'text/vtt') + self.finish(f.read()) + + @staticmethod + def remove_subtitles(media_id: str): + """ + Remove the current subtitle track from a streamed from a media file. + """ + media_hndl = load_media_map().get(media_id) + if not media_hndl: + raise FileNotFoundError(f'{media_id} is not a registered media_id') + + if not media_hndl.subtitles: + raise FileNotFoundError(f'{media_id} has no subtitles attached') + + media_hndl.remove_subtitles() + return {} + + def add_subtitles(self, media_id: str): + """ + This route can be used to download and/or expose subtitles files + associated to a media file + """ + + media_hndl = load_media_map().get(media_id) + if not media_hndl: + raise FileNotFoundError(f'{media_id} is not a registered media_id') + + subfile = None + if self.request.body: + subfile = json.loads(self.request.body).get('filename') + assert subfile, 'No filename specified in the request' + + if not subfile: + if not media_hndl.path: + raise NotImplementedError( + 'Subtitles are currently only supported for local media files' + ) + + req = { + 'type': 'request', + 'action': 'media.subtitles.get_subtitles', + 'args': { + 'resource': media_hndl.path, + }, + } + + try: + subtitles = send_request(req) or [] + except Exception as e: + raise RuntimeError(f'Could not get subtitles: {e}') from e + + if not subtitles: + raise FileNotFoundError( + f'No subtitles found for resource {media_hndl.path}' + ) + + req = { + 'type': 'request', + 'action': 'media.subtitles.download', + 'args': { + 'link': subtitles[0].get('SubDownloadLink'), + 'media_resource': media_hndl.path, + 'convert_to_vtt': True, + }, + } + + subfile = (send_request(req) or {}).get('filename') + + media_hndl.set_subtitles(subfile) + return { + 'filename': subfile, + 'url': f'/media/subtitles/{media_id}.vtt', + } diff --git a/platypush/backend/http/app/streaming/plugins/media/_unregister.py b/platypush/backend/http/app/streaming/plugins/media/_unregister.py new file mode 100644 index 0000000000..a82236e933 --- /dev/null +++ b/platypush/backend/http/app/streaming/plugins/media/_unregister.py @@ -0,0 +1,26 @@ +from typing import Optional + +from platypush.backend.http.app.utils import logger +from platypush.backend.http.media.handlers import MediaHandler + +from ._registry import load_media_map, save_media_map, media_map_lock + + +def unregister_media(source: Optional[str] = None): + """ + Unregisters a media streaming URL file given its source. + """ + assert source is not None, 'No media_id specified' + media_id = MediaHandler.get_media_id(source) + media_info = {} + + with media_map_lock: + media_map = load_media_map() + if media_id not in media_map: + raise FileNotFoundError(f'{source} is not a registered media_id') + + media_info = media_map.pop(media_id) + save_media_map(media_map) + + logger().info('Unregistered %s from %s', source, media_info.url) + return media_info diff --git a/platypush/backend/http/media/handlers/__init__.py b/platypush/backend/http/media/handlers/__init__.py index 2d63ab81c7..99948c5cd7 100644 --- a/platypush/backend/http/media/handlers/__init__.py +++ b/platypush/backend/http/media/handlers/__init__.py @@ -1,7 +1,7 @@ from abc import ABC, abstractmethod import hashlib import logging -from typing import Optional +from typing import Generator, Optional from platypush.message import JSONAble @@ -66,7 +66,7 @@ class MediaHandler(JSONAble, ABC): from_bytes: Optional[int] = None, to_bytes: Optional[int] = None, chunk_size: Optional[int] = None, - ) -> bytes: + ) -> Generator[bytes, None, None]: raise NotImplementedError() @property diff --git a/platypush/backend/http/media/handlers/file.py b/platypush/backend/http/media/handlers/file.py index 3aafba5966..e299c9359c 100644 --- a/platypush/backend/http/media/handlers/file.py +++ b/platypush/backend/http/media/handlers/file.py @@ -23,20 +23,17 @@ class FileHandler(MediaHandler): self.filename = self.path.split('/')[-1] if not os.path.isfile(self.path): - raise FileNotFoundError(f'{self.path} is not a valid file') + raise FileNotFoundError(self.path) self.mime_type = get_mime_type(source) assert self.mime_type, f'Could not detect mime type for {source}' - if ( - self.mime_type[:5] not in ['video', 'audio', 'image'] - and self.mime_type != 'application/octet-stream' - ): - raise AttributeError( - f'{source} is not a valid media file (detected format: {self.mime_type})' - ) + assert ( + self.mime_type[:5] in ['video', 'audio', 'image'] + or self.mime_type == 'application/octet-stream' + ), f'{source} is not a valid media file (detected format: {self.mime_type})' self.extension = mimetypes.guess_extension(self.mime_type) - if self.url: + if self.url and self.extension: self.url += self.extension self.content_length = os.path.getsize(self.path)