Black/LINT pass for media handler routes.

This commit is contained in:
Fabio Manganiello 2023-11-04 16:09:35 +01:00
parent 11c3b7820d
commit f7fe844296
Signed by: blacklight
GPG key ID: D90FBA7F76362774
2 changed files with 89 additions and 30 deletions

View file

@ -1,7 +1,12 @@
from abc import ABC, abstractmethod
import hashlib
import logging
from typing import Optional
from platypush.message import JSONAble
class MediaHandler:
class MediaHandler(JSONAble, ABC):
"""
Abstract class to manage media handlers that can be streamed over the HTTP
server through the `/media` endpoint.
@ -9,17 +14,26 @@ class MediaHandler:
prefix_handlers = []
def __init__(self, source, filename=None,
mime_type='application/octet-stream', name=None, url=None,
subtitles=None):
matched_handlers = [hndl for hndl in self.prefix_handlers
if source.startswith(hndl)]
def __init__(
self,
source: str,
*_,
filename: Optional[str] = None,
mime_type: str = 'application/octet-stream',
name: Optional[str] = None,
url: Optional[str] = None,
subtitles: Optional[str] = None,
**__,
):
matched_handlers = [
hndl for hndl in self.prefix_handlers if source.startswith(hndl)
]
if not matched_handlers:
raise AttributeError(('No matched handlers found for source "{}" ' +
'through {}. Supported handlers: {}').format(
source, self.__class__.__name__,
self.prefix_handlers))
raise AttributeError(
f'No matched handlers found for source "{source}" '
f'through {self.__class__.__name__}. Supported handlers: {self.prefix_handlers}'
)
self.name = name
self.path = None
@ -32,7 +46,7 @@ class MediaHandler:
self._matched_handler = matched_handlers[0]
@classmethod
def build(cls, source, *args, **kwargs):
def build(cls, source: str, *args, **kwargs) -> 'MediaHandler':
errors = {}
for hndl_class in supported_handlers:
@ -42,32 +56,68 @@ class MediaHandler:
logging.exception(e)
errors[hndl_class.__name__] = str(e)
raise AttributeError(('The source {} has no handlers associated. ' +
'Errors: {}').format(source, errors))
raise AttributeError(
f'The source {source} has no handlers associated. Errors: {errors}'
)
def get_data(self, from_bytes=None, to_bytes=None, chunk_size=None):
@abstractmethod
def get_data(
self,
from_bytes: Optional[int] = None,
to_bytes: Optional[int] = None,
chunk_size: Optional[int] = None,
) -> bytes:
raise NotImplementedError()
def set_subtitles(self, subtitles_file):
@property
def media_id(self) -> str:
"""
:returns: The unique ID of the media handler.
"""
return self.get_media_id(self.source)
def set_subtitles(self, subtitles_file: Optional[str]):
self.subtitles = subtitles_file
def remove_subtitles(self):
self.subtitles = None
def __iter__(self):
for attr in ['name', 'source', 'mime_type', 'url', 'subtitles',
'prefix_handlers', 'media_id']:
"""
Iterate over the attributes of the media handler.
"""
for attr in [
'name',
'source',
'mime_type',
'url',
'subtitles',
'prefix_handlers',
'media_id',
]:
if hasattr(self, attr):
yield attr, getattr(self, attr)
@staticmethod
def get_media_id(source: str) -> str:
"""
:returns: The ID of a media file given its source.
"""
return hashlib.sha1(source.encode()).hexdigest()
from .file import FileHandler
def to_json(self):
"""
:returns: A dictionary representation of the media handler.
"""
return dict(self)
from .file import FileHandler # noqa
__all__ = ['MediaHandler', 'FileHandler']
supported_handlers = [eval(hndl) for hndl in __all__
if hndl != MediaHandler.__name__]
supported_handlers = [eval(hndl) for hndl in __all__ if hndl != MediaHandler.__name__]
# vim:sw=4:ts=4:et:

View file

@ -8,30 +8,38 @@ from . import MediaHandler
class FileHandler(MediaHandler):
"""
Handler for local media files.
"""
prefix_handlers = ['file://']
def __init__(self, source, *args, **kwargs):
super().__init__(source, *args, **kwargs)
self.path = os.path.abspath(os.path.expanduser(
self.source[len(self._matched_handler):]))
self.path = os.path.abspath(
os.path.expanduser(self.source[len(self._matched_handler) :])
)
self.filename = self.path.split('/')[-1]
if not os.path.isfile(self.path):
raise FileNotFoundError('{} is not a valid file'.
format(self.path))
raise FileNotFoundError(f'{self.path} is not a valid file')
self.mime_type = get_mime_type(source)
if self.mime_type[:5] not in ['video', 'audio', 'image'] and self.mime_type != 'application/octet-stream':
raise AttributeError('{} is not a valid media file (detected format: {})'.
format(source, self.mime_type))
assert self.mime_type, f'Could not detect mime type for {source}'
if (
self.mime_type[:5] not in ['video', 'audio', 'image']
and self.mime_type != 'application/octet-stream'
):
raise AttributeError(
f'{source} is not a valid media file (detected format: {self.mime_type})'
)
self.extension = mimetypes.guess_extension(self.mime_type)
if self.url:
self.url += self.extension
self.content_length = os.path.getsize(self.path)
def get_data(self, from_bytes=None, to_bytes=None, chunk_size=None):
if from_bytes is None:
from_bytes = 0
@ -42,8 +50,9 @@ class FileHandler(MediaHandler):
with open(self.path, 'rb') as f:
f.seek(from_bytes)
for chunk in iter(functools.partial(
f.read, min(to_bytes-from_bytes, chunk_size)), b''):
for chunk in iter(
functools.partial(f.read, min(to_bytes - from_bytes, chunk_size)), b''
):
yield chunk