diff --git a/platypush/backend/http/app/streaming/plugins/camera.py b/platypush/backend/http/app/streaming/plugins/camera.py index 847674613..d3b128ba1 100644 --- a/platypush/backend/http/app/streaming/plugins/camera.py +++ b/platypush/backend/http/app/streaming/plugins/camera.py @@ -8,6 +8,7 @@ from tornado.web import stream_request_body from platypush.context import get_plugin from platypush.plugins.camera import Camera, CameraPlugin, StreamWriter +from platypush.utils import get_plugin_name_by_class from .. import StreamingRoute @@ -30,6 +31,8 @@ class CameraRoute(StreamingRoute): Route for camera streams. """ + _redis_queue_prefix = '_platypush/camera' + def __init__(self, *args, **kwargs): # TODO Support multiple concurrent requests super().__init__(*args, **kwargs) @@ -67,9 +70,16 @@ class CameraRoute(StreamingRoute): return True - def send_feed(self, camera: Camera): - while not self._should_stop(): - frame = self._get_frame(camera, timeout=5.0) + def send_feed(self, camera: CameraPlugin): + redis_queue = self._get_redis_queue_by_camera(camera) + for msg in self.listen(): + if self._should_stop(): + break + + if msg.channel != redis_queue: + continue + + frame = msg.data if frame: self.write(frame) self.flush() @@ -111,6 +121,21 @@ class CameraRoute(StreamingRoute): return kwargs + @classmethod + def _get_redis_queue_by_camera(cls, camera: CameraPlugin) -> str: + plugin_name = get_plugin_name_by_class(camera.__class__) + assert plugin_name, f'No such plugin: {plugin_name}' + return '/'.join( + [ + cls._redis_queue_prefix, + plugin_name, + *map( + str, + [camera.camera_info.device] if camera.camera_info.device else [], + ), + ] + ) + def get(self, plugin: str, route: str, extension: str = '') -> None: self._set_request_type_and_extension(route, extension) if not (self._request_type and self._extension): @@ -119,18 +144,21 @@ class CameraRoute(StreamingRoute): stream_class = StreamWriter.get_class_by_name(self._extension) camera = self._get_camera(plugin) + redis_queue = self._get_redis_queue_by_camera(camera) self.set_header('Content-Type', stream_class.mimetype) + self.subscribe(redis_queue) with camera.open( stream=True, stream_format=self._extension, frames_dir=None, + redis_queue=redis_queue, **self._get_args(self.request.arguments), ) as session: camera.start_camera(session) if self._request_type == RequestType.PHOTO: self.send_frame(session) elif self._request_type == RequestType.VIDEO: - self.send_feed(session) + self.send_feed(camera) self.finish() diff --git a/platypush/plugins/camera/__init__.py b/platypush/plugins/camera/__init__.py index cb0dff796..f78246c60 100644 --- a/platypush/plugins/camera/__init__.py +++ b/platypush/plugins/camera/__init__.py @@ -194,7 +194,11 @@ class CameraPlugin(Plugin, ABC): return merged_info def open_device( - self, device: Optional[Union[int, str]] = None, stream: bool = False, **params + self, + device: Optional[Union[int, str]], + stream: bool = False, + redis_queue: Optional[str] = None, + **params, ) -> Camera: """ Initialize and open a device. @@ -231,7 +235,9 @@ class CameraPlugin(Plugin, ABC): if stream: writer_class = StreamWriter.get_class_by_name(camera.info.stream_format) - camera.stream = writer_class(camera=camera, plugin=self) + camera.stream = writer_class( + camera=camera, plugin=self, redis_queue=redis_queue + ) if camera.info.frames_dir: pathlib.Path( @@ -275,19 +281,27 @@ class CameraPlugin(Plugin, ABC): @contextmanager def open( - self, device: Optional[Union[int, str]] = None, stream: bool = None, **info + self, + device: Optional[Union[int, str]] = None, + stream: bool = None, + redis_queue: Optional[str] = None, + **info, ) -> Generator[Camera, None, None]: """ Initialize and open a device using a context manager pattern. :param device: Capture device by name, path or ID. :param stream: If set, the frames will be streamed to ``camera.stream``. + :param redis_queue: If set, the frames will be streamed to + ``redis_queue``. :param info: Camera parameters override - see constructors parameters. :return: The initialized :class:`platypush.plugins.camera.Camera` object. """ camera = None try: - camera = self.open_device(device, stream=stream, **info) + camera = self.open_device( + device, stream=stream, redis_queue=redis_queue, **info + ) yield camera finally: self.close_device(camera) diff --git a/platypush/plugins/camera/model/writer/__init__.py b/platypush/plugins/camera/model/writer/__init__.py index 1113dcf10..a24d780f7 100644 --- a/platypush/plugins/camera/model/writer/__init__.py +++ b/platypush/plugins/camera/model/writer/__init__.py @@ -9,6 +9,8 @@ from typing import Optional, IO from PIL.Image import Image +from platypush.utils import get_redis + class VideoWriter(ABC): """ @@ -71,12 +73,19 @@ class StreamWriter(VideoWriter, ABC): Abstract class for camera streaming operations. """ - def __init__(self, *args, sock: Optional[IO] = None, **kwargs): + def __init__( + self, + *args, + sock: Optional[IO] = None, + redis_queue: Optional[str] = None, + **kwargs, + ): super().__init__(*args, **kwargs) self.frame: Optional[bytes] = None self.frame_time: Optional[float] = None self.buffer = io.BytesIO() self.ready = multiprocessing.Condition() + self.redis_queue = redis_queue self.sock = sock def write(self, image: Image): @@ -103,6 +112,9 @@ class StreamWriter(VideoWriter, ABC): self.logger.info('Client connection closed') self.close() + if self.redis_queue: + get_redis().publish(self.redis_queue, data) + @abstractmethod def encode(self, image: Image) -> bytes: """