diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index a4879c961..051dc983f 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -16,7 +16,7 @@ from tornado.web import Application, FallbackHandler from platypush.backend import Backend from platypush.backend.http.app import application -from platypush.backend.http.app.utils import get_ws_routes +from platypush.backend.http.app.utils import get_streaming_routes, get_ws_routes from platypush.backend.http.app.ws.events import events_redis_topic from platypush.bus.redis import RedisBus @@ -331,7 +331,10 @@ class HttpBackend(Backend): container = WSGIContainer(application) tornado_app = Application( [ - *[(route.path(), route) for route in get_ws_routes()], + *[ + (route.path(), route) + for route in [*get_ws_routes(), *get_streaming_routes()] + ], (r'.*', FallbackHandler, {'fallback': container}), ] ) @@ -352,7 +355,7 @@ class HttpBackend(Backend): ) if self.use_werkzeug_server: - application.config['redis_queue'] = self.bus.redis_queue + application.config['redis_queue'] = self.bus.redis_queue # type: ignore application.run( host=self.bind_address, port=self.port, diff --git a/platypush/backend/http/app/routes/plugins/camera/__init__.py b/platypush/backend/http/app/routes/plugins/camera/__init__.py index 94b1fbefe..e69de29bb 100644 --- a/platypush/backend/http/app/routes/plugins/camera/__init__.py +++ b/platypush/backend/http/app/routes/plugins/camera/__init__.py @@ -1,116 +0,0 @@ -import json -from typing import Optional - -from flask import Blueprint, request -from flask.wrappers import Response - -from platypush.backend.http.app import template_folder -from platypush.backend.http.app.utils import authenticate -from platypush.context import get_plugin -from platypush.plugins.camera import CameraPlugin, Camera, StreamWriter - -camera = Blueprint('camera', __name__, template_folder=template_folder) - -# Declare routes list -__routes__ = [ - camera, -] - - -def get_camera(plugin: str) -> CameraPlugin: - plugin_name = f'camera.{plugin}' - p = get_plugin(plugin_name) - assert p, f'No such plugin: {plugin_name}' - return p - - -def get_frame(session: Camera, timeout: Optional[float] = None) -> Optional[bytes]: - if session.stream: - with session.stream.ready: - session.stream.ready.wait(timeout=timeout) - return session.stream.frame - - -def feed(camera: CameraPlugin, **kwargs): - with camera.open(**kwargs) as session: - camera.start_camera(session) - while True: - frame = get_frame(session, timeout=5.0) - if frame: - yield frame - - -def get_args(kwargs): - kwargs = kwargs.copy() - if 't' in kwargs: - del kwargs['t'] - - for k, v in kwargs.items(): - if k == 'resolution': - v = json.loads('[{}]'.format(v)) - else: - try: - v = int(v) - except (ValueError, TypeError): - try: - v = float(v) - except (ValueError, TypeError): - pass - - kwargs[k] = v - - return kwargs - - -@camera.route('/camera//photo.', methods=['GET']) -@authenticate() -def get_photo(plugin, extension): - plugin = get_camera(plugin) - extension = 'jpeg' if extension in ('jpg', 'jpeg') else extension - - with plugin.open(stream=True, stream_format=extension, frames_dir=None, **get_args(request.args)) as session: - plugin.start_camera(session) - frame = None - for _ in range(session.info.warmup_frames): - frame = get_frame(session) - - return Response(frame, mimetype=session.stream.mimetype) - - -@camera.route('/camera//video.', methods=['GET']) -@authenticate() -def get_video(plugin, extension): - stream_class = StreamWriter.get_class_by_name(extension) - camera = get_camera(plugin) - return Response( - feed(camera, stream=True, stream_format=extension, frames_dir=None, - **get_args(request.args) - ), mimetype=stream_class.mimetype - ) - - -@camera.route('/camera//photo', methods=['GET']) -@authenticate() -def get_photo_default(plugin): - return get_photo(plugin, 'jpeg') - - -@camera.route('/camera//video', methods=['GET']) -@authenticate() -def get_video_default(plugin): - return get_video(plugin, 'mjpeg') - - -@camera.route('/camera//frame', methods=['GET']) -@authenticate() -def get_photo_deprecated(plugin): - return get_photo_default(plugin) - - -@camera.route('/camera//feed', methods=['GET']) -@authenticate() -def get_video_deprecated(plugin): - return get_video_default(plugin) - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/routes/plugins/camera/ir/mlx90640.py b/platypush/backend/http/app/routes/plugins/camera/ir/mlx90640.py deleted file mode 100644 index eada98492..000000000 --- a/platypush/backend/http/app/routes/plugins/camera/ir/mlx90640.py +++ /dev/null @@ -1,51 +0,0 @@ -from flask import Blueprint - -from platypush.backend.http.app import template_folder -from platypush.backend.http.app.routes.plugins.camera import get_photo, get_video -from platypush.backend.http.app.utils import authenticate - -camera_ir_mlx90640 = Blueprint('camera-ir-mlx90640', __name__, template_folder=template_folder) - -# Declare routes list -__routes__ = [ - camera_ir_mlx90640, -] - - -@camera_ir_mlx90640.route('/camera/ir/mlx90640/photo.', methods=['GET']) -@authenticate() -def get_photo_route(extension): - return get_photo('ir.mlx90640', extension) - - -@camera_ir_mlx90640.route('/camera/ir/mlx90640/video.', methods=['GET']) -@authenticate() -def get_video_route(extension): - return get_video('ir.mlx90640', extension) - - -@camera_ir_mlx90640.route('/camera/ir/mlx90640/photo', methods=['GET']) -@authenticate() -def get_photo_route_default(): - return get_photo_route('jpeg') - - -@camera_ir_mlx90640.route('/camera/ir/mlx90640/video', methods=['GET']) -@authenticate() -def get_video_route_default(): - return get_video_route('mjpeg') - - -@camera_ir_mlx90640.route('/camera/ir/mlx90640/frame', methods=['GET']) -@authenticate() -def get_photo_route_deprecated(): - return get_photo_route_default() - - -@camera_ir_mlx90640.route('/camera/ir/mlx90640/feed', methods=['GET']) -@authenticate() -def get_video_route_deprecated(): - return get_video_route_default() - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/app/streaming/__init__.py b/platypush/backend/http/app/streaming/__init__.py new file mode 100644 index 000000000..b174974e7 --- /dev/null +++ b/platypush/backend/http/app/streaming/__init__.py @@ -0,0 +1,3 @@ +from ._base import StreamingRoute, logger + +__all__ = ['StreamingRoute', 'logger'] diff --git a/platypush/backend/http/app/streaming/_base.py b/platypush/backend/http/app/streaming/_base.py new file mode 100644 index 000000000..e162eeed0 --- /dev/null +++ b/platypush/backend/http/app/streaming/_base.py @@ -0,0 +1,50 @@ +from abc import ABC, abstractmethod +from http.client import responses +import json +from logging import getLogger +from typing import Optional +from typing_extensions import override + +from tornado.web import RequestHandler, stream_request_body + +from platypush.backend.http.app.utils.auth import AuthStatus, get_auth_status + +logger = getLogger(__name__) + + +@stream_request_body +class StreamingRoute(RequestHandler, ABC): + """ + Base class for Tornado streaming routes. + """ + + @override + def prepare(self): + # Perform authentication + if self.auth_required: + auth_status = get_auth_status(self.request) + if auth_status != AuthStatus.OK: + self.send_error(auth_status.value.code, error=auth_status.value.message) + return + + logger.info( + 'Client %s connected to %s', self.request.remote_ip, self.request.path + ) + + @override + def write_error(self, status_code: int, error: Optional[str] = None, **_): + self.set_header("Content-Type", "application/json") + self.finish( + json.dumps( + {"status": status_code, "error": error or responses[status_code]} + ) + ) + + @classmethod + @abstractmethod + def path(cls) -> str: + raise NotImplementedError() + + @property + def auth_required(self): + return True diff --git a/platypush/backend/http/app/routes/plugins/camera/ir/__init__.py b/platypush/backend/http/app/streaming/plugins/__init__.py similarity index 100% rename from platypush/backend/http/app/routes/plugins/camera/ir/__init__.py rename to platypush/backend/http/app/streaming/plugins/__init__.py diff --git a/platypush/backend/http/app/streaming/plugins/camera.py b/platypush/backend/http/app/streaming/plugins/camera.py new file mode 100644 index 000000000..ccc978c2f --- /dev/null +++ b/platypush/backend/http/app/streaming/plugins/camera.py @@ -0,0 +1,134 @@ +from enum import Enum +import json +from logging import getLogger +from typing import Optional +from typing_extensions import override + +from tornado.web import stream_request_body +from platypush.context import get_plugin + +from platypush.plugins.camera import Camera, CameraPlugin, StreamWriter + +from .. import StreamingRoute + +logger = getLogger(__name__) + + +class RequestType(Enum): + """ + Models the camera route request type (video or photo) + """ + + UNKNOWN = '' + PHOTO = 'photo' + VIDEO = 'video' + + +@stream_request_body +class CameraRoute(StreamingRoute): + """ + Route for camera streams. + """ + + def __init__(self, *args, **kwargs): + # TODO Support multiple concurrent requests + super().__init__(*args, **kwargs) + self._camera: Optional[Camera] = None + self._request_type = RequestType.UNKNOWN + self._extension: str = '' + + @override + @classmethod + def path(cls) -> str: + return r"/camera/([a-zA-Z0-9_./]+)/([a-zA-Z0-9_]+)\.?([a-zA-Z0-9_]+)?" + + def _get_camera(self, plugin: str) -> CameraPlugin: + plugin_name = f'camera.{plugin.replace("/", ".")}' + p = get_plugin(plugin_name) + assert p, f'No such plugin: {plugin_name}' + return p + + def _get_frame( + self, camera: Camera, timeout: Optional[float] = None + ) -> Optional[bytes]: + if camera.stream: + with camera.stream.ready: + camera.stream.ready.wait(timeout=timeout) + return camera.stream.frame + + def _should_stop(self): + if self._finished: + return True + + if self.request.connection and getattr(self.request.connection, 'stream', None): + return self.request.connection.stream.closed() # type: ignore + + return True + + def send_feed(self, camera: Camera): + while not self._should_stop(): + frame = self._get_frame(camera, timeout=5.0) + if frame: + self.write(frame) + self.flush() + + def send_frame(self, camera: Camera): + frame = None + for _ in range(camera.info.warmup_frames): + frame = self._get_frame(camera) + + if frame: + self.write(frame) + self.flush() + + def _set_request_type_and_extension(self, route: str, extension: str): + if route in {'photo', 'frame'}: + self._request_type = RequestType.PHOTO + if extension == 'jpg': + extension = 'jpeg' + self._extension = extension or 'jpeg' + elif route in {'video', 'feed'}: + self._request_type = RequestType.VIDEO + self._extension = extension or 'mjpeg' + + def _get_args(self, kwargs: dict): + kwargs = {k: v[0].decode() for k, v in kwargs.items() if k != 't'} + for k, v in kwargs.items(): + if k == 'resolution': + v = json.loads(f'[{v}]') + else: + try: + v = int(v) + except (ValueError, TypeError): + try: + v = float(v) + except (ValueError, TypeError): + pass + + kwargs[k] = v + + return kwargs + + def get(self, plugin: str, route: str, extension: str = '') -> None: + self._set_request_type_and_extension(route, extension) + if not (self._request_type and self._extension): + self.write_error(404, 'Not Found') + return + + stream_class = StreamWriter.get_class_by_name(self._extension) + camera = self._get_camera(plugin) + self.set_header('Content-Type', stream_class.mimetype) + + with camera.open( + stream=True, + stream_format=self._extension, + frames_dir=None, + **self._get_args(self.request.arguments), + ) as session: + camera.start_camera(session) + if self._request_type == RequestType.PHOTO: + self.send_frame(session) + elif self._request_type == RequestType.VIDEO: + self.send_feed(session) + + self.finish() diff --git a/platypush/backend/http/app/utils/__init__.py b/platypush/backend/http/app/utils/__init__.py index 08eae3446..cad44b269 100644 --- a/platypush/backend/http/app/utils/__init__.py +++ b/platypush/backend/http/app/utils/__init__.py @@ -13,6 +13,7 @@ from .routes import ( get_remote_base_url, get_routes, ) +from .streaming import get_streaming_routes from .ws import get_ws_routes __all__ = [ @@ -27,6 +28,7 @@ __all__ = [ 'get_message_response', 'get_remote_base_url', 'get_routes', + 'get_streaming_routes', 'get_ws_routes', 'logger', 'send_message', diff --git a/platypush/backend/http/app/utils/streaming.py b/platypush/backend/http/app/utils/streaming.py new file mode 100644 index 000000000..02a9dd990 --- /dev/null +++ b/platypush/backend/http/app/utils/streaming.py @@ -0,0 +1,39 @@ +import os +import importlib +import inspect +from typing import List, Type + +import pkgutil + +from ..streaming import StreamingRoute, logger + + +def get_streaming_routes() -> List[Type[StreamingRoute]]: + """ + Scans for streaming routes. + """ + from platypush.backend.http import HttpBackend + + base_pkg = '.'.join([HttpBackend.__module__, 'app', 'streaming']) + base_dir = os.path.join( + os.path.dirname(inspect.getfile(HttpBackend)), 'app', 'streaming' + ) + routes = [] + + for _, mod_name, _ in pkgutil.walk_packages([base_dir], prefix=base_pkg + '.'): + try: + module = importlib.import_module(mod_name) + except Exception as e: + logger.warning('Could not import module %s', mod_name) + logger.exception(e) + continue + + for _, obj in inspect.getmembers(module): + if ( + inspect.isclass(obj) + and not inspect.isabstract(obj) + and issubclass(obj, StreamingRoute) + ): + routes.append(obj) + + return routes diff --git a/platypush/plugins/camera/__init__.py b/platypush/plugins/camera/__init__.py index 8019abf8b..cb0dff796 100644 --- a/platypush/plugins/camera/__init__.py +++ b/platypush/plugins/camera/__init__.py @@ -834,7 +834,7 @@ class CameraPlugin(Plugin, ABC): if device: return self._status(device) - return {id: self._status(device) for id, camera in self._devices.items()} + return {id: self._status(id) for id in self._devices} @staticmethod def transform_frame(frame, color_transform):