Major rewrite of the media routes.

- Streaming and media subtitles endpoints moved from Flask to Tornado
  routes - the old Flask streaming route no longer worked behind a
  Tornado server.

- Storing the streaming state on Redis rather than in a local variable,
  or different Tornado processes may end up with different copies of the
  registry.

Closes: #336
This commit is contained in:
Fabio Manganiello 2023-11-05 01:33:36 +01:00
parent 0e2738d849
commit e45fb9c8ac
Signed by: blacklight
GPG key ID: D90FBA7F76362774
12 changed files with 429 additions and 356 deletions

View file

@ -1,207 +0,0 @@
import hashlib
import json
import threading
from flask import Response
from platypush.backend.http.app.utils import get_remote_base_url, logger, \
send_message
from platypush.backend.http.media.handlers import MediaHandler
media_map = {}
media_map_lock = threading.RLock()
# Size for the bytes chunk sent over the media streaming infra
STREAMING_CHUNK_SIZE = 4096
# Maximum range size to be sent through the media streamer if Range header
# is not set
STREAMING_BLOCK_SIZE = 3145728
def get_media_url(media_id):
return '{url}/media/{media_id}'.format(
url=get_remote_base_url(), media_id=media_id)
def get_media_id(source):
return hashlib.sha1(source.encode()).hexdigest()
def register_media(source, subtitles=None):
global media_map, media_map_lock
media_id = get_media_id(source)
media_url = get_media_url(media_id)
with media_map_lock:
if media_id in media_map:
return media_map[media_id]
subfile = None
if subtitles:
req = {
'type': 'request',
'action': 'media.subtitles.download',
'args': {
'link': subtitles,
'convert_to_vtt': True,
}
}
try:
subfile = (send_message(req).output or {}).get('filename')
except Exception as e:
logger().warning('Unable to load subtitle {}: {}'
.format(subtitles, str(e)))
with media_map_lock:
media_hndl = MediaHandler.build(source, url=media_url, subtitles=subfile)
media_map[media_id] = media_hndl
media_hndl.media_id = media_id
logger().info('Streaming "{}" on {}'.format(source, media_url))
return media_hndl
def unregister_media(source):
global media_map, media_map_lock
if source is None:
raise KeyError('No media_id specified')
media_id = get_media_id(source)
media_info = {}
with media_map_lock:
if media_id not in media_map:
raise FileNotFoundError('{} is not a registered media_id'.
format(source))
media_info = media_map.pop(media_id)
logger().info('Unregistered {} from {}'.format(source, media_info.get('url')))
return media_info
def stream_media(media_id, req):
global STREAMING_BLOCK_SIZE, STREAMING_CHUNK_SIZE
media_hndl = media_map.get(media_id)
if not media_hndl:
raise FileNotFoundError('{} is not a registered media_id'.format(media_id))
range_hdr = req.headers.get('range')
content_length = media_hndl.content_length
status_code = 200
headers = {
'Accept-Ranges': 'bytes',
'Content-Type': media_hndl.mime_type,
}
if 'download' in req.args:
headers['Content-Disposition'] = 'attachment' + \
('; filename="{}"'.format(media_hndl.filename) if
media_hndl.filename else '')
if range_hdr:
headers['Accept-Ranges'] = 'bytes'
from_bytes, to_bytes = range_hdr.replace('bytes=', '').split('-')
from_bytes = int(from_bytes)
if not to_bytes:
to_bytes = content_length - 1
content_length -= from_bytes
else:
to_bytes = int(to_bytes)
content_length = to_bytes - from_bytes
status_code = 206
headers['Content-Range'] = 'bytes {start}-{end}/{size}'.format(
start=from_bytes, end=to_bytes,
size=media_hndl.content_length)
else:
from_bytes = 0
to_bytes = STREAMING_BLOCK_SIZE
headers['Content-Length'] = content_length
return Response(media_hndl.get_data(
from_bytes=from_bytes, to_bytes=to_bytes,
chunk_size=STREAMING_CHUNK_SIZE),
status_code, headers=headers, mimetype=headers['Content-Type'],
direct_passthrough=True)
def add_subtitles(media_id, req):
"""
This route can be used to download and/or expose subtitles files
associated to a media file
"""
media_hndl = media_map.get(media_id)
if not media_hndl:
raise FileNotFoundError('{} is not a registered media_id'.format(media_id))
subfile = None
if req.data:
subfile = json.loads(req.data.decode('utf-8')).get('filename')
if not subfile:
raise AttributeError('No filename specified in the request')
if not subfile:
if not media_hndl.path:
raise NotImplementedError(
'Subtitles are currently only supported for local media files')
req = {
'type': 'request',
'action': 'media.subtitles.get_subtitles',
'args': {
'resource': media_hndl.path,
}
}
try:
subtitles = send_message(req).output or []
except Exception as e:
raise RuntimeError('Could not get subtitles: {}'.format(str(e)))
if not subtitles:
raise FileNotFoundError('No subtitles found for resource {}'.
format(media_hndl.path))
req = {
'type': 'request',
'action': 'media.subtitles.download',
'args': {
'link': subtitles[0].get('SubDownloadLink'),
'media_resource': media_hndl.path,
'convert_to_vtt': True,
}
}
subfile = (send_message(req).output or {}).get('filename')
media_hndl.set_subtitles(subfile)
return {
'filename': subfile,
'url': get_remote_base_url() + '/media/subtitles/' + media_id + '.vtt',
}
def remove_subtitles(media_id):
media_hndl = media_map.get(media_id)
if not media_hndl:
raise FileNotFoundError('{} is not a registered media_id'.
format(media_id))
if not media_hndl.subtitles:
raise FileNotFoundError('{} has no subtitles attached'.
format(media_id))
media_hndl.remove_subtitles()
return {}
# vim:sw=4:ts=4:et:

View file

@ -1,87 +0,0 @@
import json
from flask import abort, jsonify, request, Blueprint
from platypush.backend.http.app import template_folder
from platypush.backend.http.app.utils import logger, get_remote_base_url
from platypush.backend.http.app.routes.plugins.media import media_map, \
stream_media, register_media, unregister_media
media = Blueprint('media', __name__, template_folder=template_folder)
# Declare routes list
__routes__ = [
media,
]
@media.route('/media', methods=['GET'])
def get_media():
"""
This route can be used to get the list of registered streams
"""
return jsonify([dict(media) for media in media_map.values()])
@media.route('/media', methods=['PUT'])
def add_media():
"""
This route can be used by the `media` plugin to add streaming content over HTTP
"""
args = {}
try:
args = json.loads(request.data.decode('utf-8'))
except Exception as e:
abort(400, 'Invalid JSON request: {}'.format(str(e)))
source = args.get('source')
if not source:
abort(400, 'The request does not contain any source')
subtitles = args.get('subtitles')
try:
media_hndl = register_media(source, subtitles)
ret = dict(media_hndl)
if media_hndl.subtitles:
ret['subtitles_url'] = get_remote_base_url() + \
'/media/subtitles/' + media_hndl.media_id + '.vtt'
return jsonify(ret)
except FileNotFoundError as e:
abort(404, str(e))
except AttributeError as e:
abort(400, str(e))
except Exception as e:
logger().exception(e)
abort(500, str(e))
@media.route('/media/<media_id>', methods=['GET', 'DELETE'])
def stream_or_delete_media(media_id):
"""
This route can be used to stream active media points or unregister
a mounted media stream
"""
# Remove the extension
media_id = '.'.join(media_id.split('.')[:-1])
try:
if request.method == 'GET':
if media_id is None:
return jsonify([dict(media) for media in media_map.values()])
else:
return stream_media(media_id, request)
else:
media_info = unregister_media(media_id)
return jsonify(media_info)
except (AttributeError, FileNotFoundError) as e:
abort(404, str(e))
except KeyError as e:
abort(400, str(e))
except Exception as e:
logger().exception(e)
abort(500, str(e))
# vim:sw=4:ts=4:et:

View file

@ -1,51 +0,0 @@
import os
from flask import abort, jsonify, request, send_from_directory, Blueprint
from platypush.backend.http.app import template_folder
from platypush.backend.http.app.routes.plugins.media import media_map, \
remove_subtitles, add_subtitles
subtitles = Blueprint('subtitles', __name__, template_folder=template_folder)
# Declare routes list
__routes__ = [
subtitles,
]
@subtitles.route('/media/subtitles/<media_id>.vtt', methods=['GET', 'POST', 'DELETE'])
def handle_subtitles(media_id):
"""
This route can be used to download and/or expose subtitle files
associated to a media file
"""
if request.method == 'GET':
media_hndl = media_map.get(media_id)
if not media_hndl:
abort(404, 'No such media')
if not media_hndl.subtitles:
abort(404, 'The media has no subtitles attached')
return send_from_directory(
os.path.dirname(media_hndl.subtitles),
os.path.basename(media_hndl.subtitles),
mimetype='text/vtt')
try:
if request.method == 'DELETE':
return jsonify(remove_subtitles(media_id))
else:
return jsonify(add_subtitles(media_id, request))
except FileNotFoundError as e:
abort(404, str(e))
except AttributeError as e:
abort(400, str(e))
except NotImplementedError as e:
abort(422, str(e))
except Exception as e:
abort(500, str(e))
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,5 @@
from ._stream import MediaStreamRoute
from ._subtitles import MediaSubtitlesRoute
__all__ = ["MediaStreamRoute", "MediaSubtitlesRoute"]

View file

@ -0,0 +1,17 @@
from typing import Dict
from platypush.backend.http.media.handlers import MediaHandler
# Size for the bytes chunk sent over the media streaming infra
STREAMING_CHUNK_SIZE = 4096
# Maximum range size to be sent through the media streamer if Range header
# is not set
STREAMING_BLOCK_SIZE = 3145728
# Name of the Redis variable used to store the media map across several
# Web processes
MEDIA_MAP_VAR = 'platypush__stream_media_map'
MediaMap = Dict[str, MediaHandler]

View file

@ -0,0 +1,44 @@
from typing import Optional
from platypush.backend.http.app.utils import logger, send_request
from platypush.backend.http.media.handlers import MediaHandler
from ._registry import load_media_map, save_media_map
def get_media_url(media_id: str) -> str:
"""
:returns: The URL of a media file given its ID
"""
return f'/media/{media_id}'
def register_media(source: str, subtitles: Optional[str] = None) -> MediaHandler:
"""
Registers a media file and returns its associated media handler.
"""
media_id = MediaHandler.get_media_id(source)
media_url = get_media_url(media_id)
media_map = load_media_map()
subfile = None
if subtitles:
req = {
'type': 'request',
'action': 'media.subtitles.download',
'args': {
'link': subtitles,
'convert_to_vtt': True,
},
}
try:
subfile = (send_request(req) or {}).get('filename')
except Exception as e:
logger().warning('Unable to load subtitle %s: %s', subtitles, e)
media_hndl = MediaHandler.build(source, url=media_url, subtitles=subfile)
media_map[media_id] = media_hndl
save_media_map(media_map)
logger().info('Streaming "%s" on %s', source, media_url)
return media_hndl

View file

@ -0,0 +1,40 @@
import json
import multiprocessing
from platypush.backend.http.app.utils import logger
from platypush.backend.http.media.handlers import MediaHandler
from platypush.message import Message
from platypush.utils import get_redis
from ._constants import MEDIA_MAP_VAR, MediaMap
media_map_lock = multiprocessing.RLock()
def load_media_map() -> MediaMap:
"""
Load the media map from the server.
"""
with media_map_lock:
redis = get_redis()
try:
media_map = json.loads(
((redis.mget(MEDIA_MAP_VAR) or [None])[0] or b'{}').decode() # type: ignore
)
except Exception as e:
logger().warning('Could not load media map: %s', e)
return {}
return {
media_id: MediaHandler.build(**media_info)
for media_id, media_info in media_map.items()
}
def save_media_map(new_map: MediaMap):
"""
Updates the stored media map on the server.
"""
with media_map_lock:
redis = get_redis()
redis.mset({MEDIA_MAP_VAR: json.dumps(new_map, cls=Message.Encoder)})

View file

@ -0,0 +1,149 @@
import json
from typing import Optional
from tornado.web import stream_request_body
from platypush.backend.http.app.streaming import StreamingRoute
from ._constants import STREAMING_BLOCK_SIZE, STREAMING_CHUNK_SIZE
from ._register import register_media
from ._registry import load_media_map
from ._unregister import unregister_media
@stream_request_body
class MediaStreamRoute(StreamingRoute):
"""
Route for media streams.
"""
SUPPORTED_METHODS = ['GET', 'PUT', 'DELETE']
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._body = b''
@classmethod
def path(cls) -> str:
return r"^/media/?([a-zA-Z0-9_.]+)?$"
@property
def auth_required(self) -> bool:
return False
def get(self, media_id: Optional[str] = None):
"""
Streams a media resource by ID.
"""
# Return the list of registered streaming media resources if no ID is
# specified
if not media_id:
self.get_media()
return
# Strip the extension
media_id = '.'.join(media_id.split('.')[:-1])
try:
self.stream_media(media_id)
except Exception as e:
self._on_error(e)
def put(self, *_, **__):
"""
The `PUT` route is used to prepare a new media resource for streaming.
"""
try:
self.add_media()
except Exception as e:
self._on_error(e)
def delete(self, media_id: Optional[str] = None):
"""
Removes the given media_id from the map of streaming media.
"""
media_info = unregister_media(media_id)
self.write(json.dumps(media_info))
def data_received(self, chunk: bytes):
self._body += chunk
def add_media(self):
"""
Adds a new media resource to the map of streaming media.
"""
args = {}
try:
args = json.loads(self._body)
except Exception as e:
raise AssertionError(f'Invalid JSON request: {e}') from e
source = args.get('source')
assert source, 'The request does not contain any source'
subtitles = args.get('subtitles')
media_hndl = register_media(source, subtitles)
ret = media_hndl.to_json()
if media_hndl.subtitles:
ret['subtitles_url'] = f'/media/subtitles/{media_hndl.media_id}.vtt'
self.write(json.dumps(ret))
def get_media(self):
"""
Returns the list of registered media resources.
"""
self.add_header('Content-Type', 'application/json')
self.finish(json.dumps([dict(media) for media in load_media_map().values()]))
def stream_media(self, media_id: str):
"""
Route to stream a media file given its ID.
"""
media_hndl = load_media_map().get(media_id)
if not media_hndl:
raise FileNotFoundError(f'{media_id} is not a registered media_id')
range_hdr = self.request.headers.get('Range')
content_length = media_hndl.content_length
self.add_header('Accept-Ranges', 'bytes')
self.add_header('Content-Type', media_hndl.mime_type)
if 'download' in self.request.arguments:
self.add_header(
'Content-Disposition',
'attachment'
+ ('; filename="{media_hndl.filename}"' if media_hndl.filename else ''),
)
if range_hdr:
from_bytes, to_bytes = range_hdr.replace('bytes=', '').split('-')
from_bytes = int(from_bytes)
if not to_bytes:
to_bytes = content_length - 1
content_length -= from_bytes
else:
to_bytes = int(to_bytes)
content_length = to_bytes - from_bytes
self.set_status(206)
self.add_header(
'Content-Range',
f'bytes {from_bytes}-{to_bytes}/{media_hndl.content_length}',
)
else:
from_bytes = 0
to_bytes = STREAMING_BLOCK_SIZE
self.add_header('Content-Length', str(content_length))
for chunk in media_hndl.get_data(
from_bytes=from_bytes,
to_bytes=to_bytes,
chunk_size=STREAMING_CHUNK_SIZE,
):
self.write(chunk)
self.flush()
self.finish()

View file

@ -0,0 +1,140 @@
import json
from tornado.web import stream_request_body
from platypush.backend.http.app.streaming import StreamingRoute
from platypush.backend.http.app.utils.bus import send_request
from ._registry import load_media_map
@stream_request_body
class MediaSubtitlesRoute(StreamingRoute):
"""
Route for media stream subtitles.
"""
SUPPORTED_METHODS = ['GET', 'POST', 'DELETE']
@classmethod
def path(cls) -> str:
return r"^/media/subtitles/([a-zA-Z0-9_.]+)\.vtt$"
@property
def auth_required(self) -> bool:
return False
def get(self, media_id: str):
"""
GET route to retrieve the subtitles for the given media_id.
"""
try:
self.get_subtitles(media_id)
except Exception as e:
self._on_error(e)
def post(self, media_id: str):
"""
POST route to add subtitles to the given media_id.
"""
try:
self.add_subtitles(media_id)
except Exception as e:
self._on_error(e)
def delete(self, media_id: str):
"""
DELETE route to remove the subtitles for the given media_id.
"""
try:
self.remove_subtitles(media_id)
except Exception as e:
self._on_error(e)
def get_subtitles(self, media_id: str):
"""
Retrieves the subtitles for the given media_id.
"""
media_hndl = load_media_map().get(media_id)
if not media_hndl:
raise FileNotFoundError(f'{media_id} is not a registered media_id')
if not media_hndl.subtitles:
raise FileNotFoundError(f'{media_id} has no subtitles')
with open(media_hndl.subtitles) as f:
self.set_header('Content-Type', 'text/vtt')
self.finish(f.read())
@staticmethod
def remove_subtitles(media_id: str):
"""
Remove the current subtitle track from a streamed from a media file.
"""
media_hndl = load_media_map().get(media_id)
if not media_hndl:
raise FileNotFoundError(f'{media_id} is not a registered media_id')
if not media_hndl.subtitles:
raise FileNotFoundError(f'{media_id} has no subtitles attached')
media_hndl.remove_subtitles()
return {}
def add_subtitles(self, media_id: str):
"""
This route can be used to download and/or expose subtitles files
associated to a media file
"""
media_hndl = load_media_map().get(media_id)
if not media_hndl:
raise FileNotFoundError(f'{media_id} is not a registered media_id')
subfile = None
if self.request.body:
subfile = json.loads(self.request.body).get('filename')
assert subfile, 'No filename specified in the request'
if not subfile:
if not media_hndl.path:
raise NotImplementedError(
'Subtitles are currently only supported for local media files'
)
req = {
'type': 'request',
'action': 'media.subtitles.get_subtitles',
'args': {
'resource': media_hndl.path,
},
}
try:
subtitles = send_request(req) or []
except Exception as e:
raise RuntimeError(f'Could not get subtitles: {e}') from e
if not subtitles:
raise FileNotFoundError(
f'No subtitles found for resource {media_hndl.path}'
)
req = {
'type': 'request',
'action': 'media.subtitles.download',
'args': {
'link': subtitles[0].get('SubDownloadLink'),
'media_resource': media_hndl.path,
'convert_to_vtt': True,
},
}
subfile = (send_request(req) or {}).get('filename')
media_hndl.set_subtitles(subfile)
return {
'filename': subfile,
'url': f'/media/subtitles/{media_id}.vtt',
}

View file

@ -0,0 +1,26 @@
from typing import Optional
from platypush.backend.http.app.utils import logger
from platypush.backend.http.media.handlers import MediaHandler
from ._registry import load_media_map, save_media_map, media_map_lock
def unregister_media(source: Optional[str] = None):
"""
Unregisters a media streaming URL file given its source.
"""
assert source is not None, 'No media_id specified'
media_id = MediaHandler.get_media_id(source)
media_info = {}
with media_map_lock:
media_map = load_media_map()
if media_id not in media_map:
raise FileNotFoundError(f'{source} is not a registered media_id')
media_info = media_map.pop(media_id)
save_media_map(media_map)
logger().info('Unregistered %s from %s', source, media_info.url)
return media_info

View file

@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
import hashlib
import logging
from typing import Optional
from typing import Generator, Optional
from platypush.message import JSONAble
@ -66,7 +66,7 @@ class MediaHandler(JSONAble, ABC):
from_bytes: Optional[int] = None,
to_bytes: Optional[int] = None,
chunk_size: Optional[int] = None,
) -> bytes:
) -> Generator[bytes, None, None]:
raise NotImplementedError()
@property

View file

@ -23,20 +23,17 @@ class FileHandler(MediaHandler):
self.filename = self.path.split('/')[-1]
if not os.path.isfile(self.path):
raise FileNotFoundError(f'{self.path} is not a valid file')
raise FileNotFoundError(self.path)
self.mime_type = get_mime_type(source)
assert self.mime_type, f'Could not detect mime type for {source}'
if (
self.mime_type[:5] not in ['video', 'audio', 'image']
and self.mime_type != 'application/octet-stream'
):
raise AttributeError(
f'{source} is not a valid media file (detected format: {self.mime_type})'
)
assert (
self.mime_type[:5] in ['video', 'audio', 'image']
or self.mime_type == 'application/octet-stream'
), f'{source} is not a valid media file (detected format: {self.mime_type})'
self.extension = mimetypes.guess_extension(self.mime_type)
if self.url:
if self.url and self.extension:
self.url += self.extension
self.content_length = os.path.getsize(self.path)