From 1c40e5e843a91f7cc6c8d5846fde8caec316294f Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 23 May 2023 20:42:59 +0200 Subject: [PATCH] Black'd the camera plugin and writer. Also, proper fix for the multi-inheritance problem of the ffmpeg writers. --- platypush/plugins/camera/__init__.py | 361 +++++++++++++----- .../plugins/camera/model/writer/__init__.py | 18 +- .../plugins/camera/model/writer/ffmpeg.py | 99 +++-- 3 files changed, 347 insertions(+), 131 deletions(-) diff --git a/platypush/plugins/camera/__init__.py b/platypush/plugins/camera/__init__.py index 65f667ee08..8019abf8b2 100644 --- a/platypush/plugins/camera/__init__.py +++ b/platypush/plugins/camera/__init__.py @@ -10,21 +10,39 @@ from contextlib import contextmanager from datetime import datetime from multiprocessing import Process from queue import Queue -from typing import Optional, Union, Dict, Tuple, IO +from typing import Generator, Optional, Union, Dict, Tuple, IO from platypush.config import Config -from platypush.message.event.camera import CameraRecordingStartedEvent, CameraPictureTakenEvent, \ - CameraRecordingStoppedEvent, CameraVideoRenderedEvent +from platypush.message.event.camera import ( + CameraRecordingStartedEvent, + CameraPictureTakenEvent, + CameraRecordingStoppedEvent, + CameraVideoRenderedEvent, +) from platypush.plugins import Plugin, action from platypush.plugins.camera.model.camera import CameraInfo, Camera -from platypush.plugins.camera.model.exceptions import CameraException, CaptureAlreadyRunningException +from platypush.plugins.camera.model.exceptions import ( + CameraException, + CaptureAlreadyRunningException, +) from platypush.plugins.camera.model.writer import VideoWriter, StreamWriter from platypush.plugins.camera.model.writer.ffmpeg import FFmpegFileWriter -from platypush.plugins.camera.model.writer.preview import PreviewWriter, PreviewWriterFactory +from platypush.plugins.camera.model.writer.preview import ( + PreviewWriter, + PreviewWriterFactory, +) from platypush.utils import get_plugin_name_by_class -__all__ = ['Camera', 'CameraInfo', 'CameraException', 'CameraPlugin', 'CaptureAlreadyRunningException', - 'VideoWriter', 'StreamWriter', 'PreviewWriter'] +__all__ = [ + 'Camera', + 'CameraInfo', + 'CameraException', + 'CameraPlugin', + 'CaptureAlreadyRunningException', + 'VideoWriter', + 'StreamWriter', + 'PreviewWriter', +] class CameraPlugin(Plugin, ABC): @@ -66,15 +84,32 @@ class CameraPlugin(Plugin, ABC): _camera_info_class = CameraInfo _video_writer_class = FFmpegFileWriter - def __init__(self, device: Optional[Union[int, str]] = None, resolution: Tuple[int, int] = (640, 480), - frames_dir: Optional[str] = None, warmup_frames: int = 5, warmup_seconds: Optional[float] = 0., - capture_timeout: Optional[float] = 20.0, scale_x: Optional[float] = None, - scale_y: Optional[float] = None, rotate: Optional[float] = None, grayscale: Optional[bool] = None, - color_transform: Optional[Union[int, str]] = None, fps: float = 16, horizontal_flip: bool = False, - vertical_flip: bool = False, input_format: Optional[str] = None, output_format: Optional[str] = None, - stream_format: str = 'mjpeg', listen_port: Optional[int] = 5000, bind_address: str = '0.0.0.0', - ffmpeg_bin: str = 'ffmpeg', input_codec: Optional[str] = None, output_codec: Optional[str] = None, - **kwargs): + def __init__( + self, + device: Optional[Union[int, str]] = None, + resolution: Tuple[int, int] = (640, 480), + frames_dir: Optional[str] = None, + warmup_frames: int = 5, + warmup_seconds: Optional[float] = 0.0, + capture_timeout: Optional[float] = 20.0, + scale_x: Optional[float] = None, + scale_y: Optional[float] = None, + rotate: Optional[float] = None, + grayscale: Optional[bool] = None, + color_transform: Optional[Union[int, str]] = None, + fps: float = 16, + horizontal_flip: bool = False, + vertical_flip: bool = False, + input_format: Optional[str] = None, + output_format: Optional[str] = None, + stream_format: str = 'mjpeg', + listen_port: Optional[int] = 5000, + bind_address: str = '0.0.0.0', + ffmpeg_bin: str = 'ffmpeg', + input_codec: Optional[str] = None, + output_codec: Optional[str] = None, + **kwargs, + ): """ :param device: Identifier of the default capturing device. :param resolution: Default resolution, as a tuple of two integers. @@ -117,22 +152,38 @@ class CameraPlugin(Plugin, ABC): """ super().__init__(**kwargs) - self.workdir = os.path.join(Config.get('workdir'), get_plugin_name_by_class(self)) + self.workdir = os.path.join( + Config.get('workdir'), get_plugin_name_by_class(self) + ) pathlib.Path(self.workdir).mkdir(mode=0o755, exist_ok=True, parents=True) # noinspection PyArgumentList - self.camera_info = self._camera_info_class(device, color_transform=color_transform, warmup_frames=warmup_frames, - warmup_seconds=warmup_seconds, rotate=rotate, scale_x=scale_x, - scale_y=scale_y, capture_timeout=capture_timeout, fps=fps, - input_format=input_format, output_format=output_format, - stream_format=stream_format, resolution=resolution, - grayscale=grayscale, listen_port=listen_port, - horizontal_flip=horizontal_flip, vertical_flip=vertical_flip, - ffmpeg_bin=ffmpeg_bin, input_codec=input_codec, - output_codec=output_codec, bind_address=bind_address, - frames_dir=os.path.abspath( - os.path.expanduser(frames_dir or - os.path.join(self.workdir, 'frames')))) + self.camera_info = self._camera_info_class( + device, + color_transform=color_transform, + warmup_frames=warmup_frames, + warmup_seconds=warmup_seconds, + rotate=rotate, + scale_x=scale_x, + scale_y=scale_y, + capture_timeout=capture_timeout, + fps=fps, + input_format=input_format, + output_format=output_format, + stream_format=stream_format, + resolution=resolution, + grayscale=grayscale, + listen_port=listen_port, + horizontal_flip=horizontal_flip, + vertical_flip=vertical_flip, + ffmpeg_bin=ffmpeg_bin, + input_codec=input_codec, + output_codec=output_codec, + bind_address=bind_address, + frames_dir=os.path.abspath( + os.path.expanduser(frames_dir or os.path.join(self.workdir, 'frames')) + ), + ) self._devices: Dict[Union[int, str], Camera] = {} self._streams: Dict[Union[int, str], Camera] = {} @@ -142,7 +193,9 @@ class CameraPlugin(Plugin, ABC): merged_info.set(**info) return merged_info - def open_device(self, device: Optional[Union[int, str]] = None, stream: bool = False, **params) -> Camera: + def open_device( + self, device: Optional[Union[int, str]] = None, stream: bool = False, **params + ) -> Camera: """ Initialize and open a device. @@ -160,7 +213,11 @@ class CameraPlugin(Plugin, ABC): assert device is not None, 'No device specified/configured' if device in self._devices: camera = self._devices[device] - if camera.capture_thread and camera.capture_thread.is_alive() and camera.start_event.is_set(): + if ( + camera.capture_thread + and camera.capture_thread.is_alive() + and camera.start_event.is_set() + ): raise CaptureAlreadyRunningException(device) camera.start_event.clear() @@ -177,8 +234,9 @@ class CameraPlugin(Plugin, ABC): camera.stream = writer_class(camera=camera, plugin=self) if camera.info.frames_dir: - pathlib.Path(os.path.abspath(os.path.expanduser(camera.info.frames_dir))).mkdir( - mode=0o755, exist_ok=True, parents=True) + pathlib.Path( + os.path.abspath(os.path.expanduser(camera.info.frames_dir)) + ).mkdir(mode=0o755, exist_ok=True, parents=True) self._devices[device] = camera return camera @@ -205,15 +263,20 @@ class CameraPlugin(Plugin, ABC): :param camera: Camera object. ``camera.info.capture_timeout`` is used as a capture thread termination timeout if set. """ - if camera.capture_thread and camera.capture_thread.is_alive() and \ - threading.get_ident() != camera.capture_thread.ident: + if ( + camera.capture_thread + and camera.capture_thread.is_alive() + and threading.get_ident() != camera.capture_thread.ident + ): try: camera.capture_thread.join(timeout=camera.info.capture_timeout) except Exception as e: - self.logger.warning('Error on FFmpeg capture wait: {}'.format(str(e))) + self.logger.warning('Error on FFmpeg capture wait: %s', e) @contextmanager - def open(self, device: Optional[Union[int, str]] = None, stream: bool = None, **info) -> Camera: + def open( + self, device: Optional[Union[int, str]] = None, stream: bool = None, **info + ) -> Generator[Camera, None, None]: """ Initialize and open a device using a context manager pattern. @@ -267,6 +330,7 @@ class CameraPlugin(Plugin, ABC): :param format: Output format. """ from PIL import Image + if isinstance(frame, bytes): frame = list(frame) elif not isinstance(frame, Image.Image): @@ -278,16 +342,28 @@ class CameraPlugin(Plugin, ABC): frame.save(filepath, **save_args) - def _store_frame(self, frame, frames_dir: Optional[str] = None, image_file: Optional[str] = None, - *args, **kwargs) -> str: + def _store_frame( + self, + frame, + frames_dir: Optional[str] = None, + image_file: Optional[str] = None, + *args, + **kwargs, + ) -> str: """ :meth:`.store_frame` wrapper. """ if image_file: filepath = os.path.abspath(os.path.expanduser(image_file)) else: - filepath = os.path.abspath(os.path.expanduser( - os.path.join(frames_dir or '', datetime.now().strftime('%Y-%m-%d_%H-%M-%S-%f.jpg')))) + filepath = os.path.abspath( + os.path.expanduser( + os.path.join( + frames_dir or '', + datetime.now().strftime('%Y-%m-%d_%H-%M-%S-%f.jpg'), + ) + ) + ) pathlib.Path(filepath).parent.mkdir(mode=0o755, exist_ok=True, parents=True) self.store_frame(frame, filepath, *args, **kwargs) @@ -295,7 +371,9 @@ class CameraPlugin(Plugin, ABC): def start_preview(self, camera: Camera): if camera.preview and not camera.preview.closed: - self.logger.info('A preview window is already active on device {}'.format(camera.info.device)) + self.logger.info( + 'A preview window is already active on device %s', camera.info.device + ) return camera.preview = PreviewWriterFactory.get(camera, self) @@ -315,7 +393,9 @@ class CameraPlugin(Plugin, ABC): camera.preview = None - def frame_processor(self, frame_queue: Queue, camera: Camera, image_file: Optional[str] = None): + def frame_processor( + self, frame_queue: Queue, camera: Camera, image_file: Optional[str] = None + ): while True: frame = frame_queue.get() if frame is None: @@ -326,18 +406,31 @@ class CameraPlugin(Plugin, ABC): frame = self.to_grayscale(frame) frame = self.rotate_frame(frame, camera.info.rotate) - frame = self.flip_frame(frame, camera.info.horizontal_flip, camera.info.vertical_flip) + frame = self.flip_frame( + frame, camera.info.horizontal_flip, camera.info.vertical_flip + ) frame = self.scale_frame(frame, camera.info.scale_x, camera.info.scale_y) for output in camera.get_outputs(): output.write(frame) if camera.info.frames_dir or image_file: - self._store_frame(frame=frame, frames_dir=camera.info.frames_dir, image_file=image_file) + self._store_frame( + frame=frame, + frames_dir=camera.info.frames_dir, + image_file=image_file, + ) - def capturing_thread(self, camera: Camera, duration: Optional[float] = None, video_file: Optional[str] = None, - image_file: Optional[str] = None, n_frames: Optional[int] = None, preview: bool = False, - **kwargs): + def capturing_thread( + self, + camera: Camera, + duration: Optional[float] = None, + video_file: Optional[str] = None, + image_file: Optional[str] = None, + n_frames: Optional[int] = None, + preview: bool = False, + **kwargs, + ): """ Camera capturing thread. @@ -366,25 +459,36 @@ class CameraPlugin(Plugin, ABC): if duration and camera.info.warmup_seconds: duration = duration + camera.info.warmup_seconds if video_file: - camera.file_writer = self._video_writer_class(camera=camera, plugin=self, output_file=video_file) + camera.file_writer = self._video_writer_class( + camera=camera, plugin=self, output_file=video_file + ) frame_queue = Queue() - frame_processor = threading.Thread(target=self.frame_processor, - kwargs=dict(frame_queue=frame_queue, camera=camera, image_file=image_file)) + frame_processor = threading.Thread( + target=self.frame_processor, + kwargs={ + 'frame_queue': frame_queue, + 'camera': camera, + 'image_file': image_file, + }, + ) frame_processor.start() self.fire_event(CameraRecordingStartedEvent(**evt_args)) try: while camera.start_event.is_set(): - if (duration and time.time() - recording_started_time >= duration) \ - or (n_frames and captured_frames >= n_frames): + if (duration and time.time() - recording_started_time >= duration) or ( + n_frames and captured_frames >= n_frames + ): break frame_capture_start = time.time() try: frame = self.capture_frame(camera, **kwargs) if not frame: - self.logger.warning('Invalid frame received, terminating the capture session') + self.logger.warning( + 'Invalid frame received, terminating the capture session' + ) break frame_queue.put(frame) @@ -392,12 +496,20 @@ class CameraPlugin(Plugin, ABC): self.logger.warning(str(e)) continue - if not n_frames or not camera.info.warmup_seconds or \ - (time.time() - recording_started_time >= camera.info.warmup_seconds): + if ( + not n_frames + or not camera.info.warmup_seconds + or ( + time.time() - recording_started_time + >= camera.info.warmup_seconds + ) + ): captured_frames += 1 if camera.info.fps: - wait_time = (1. / camera.info.fps) - (time.time() - frame_capture_start) + wait_time = (1.0 / camera.info.fps) - ( + time.time() - frame_capture_start + ) if wait_time > 0: time.sleep(wait_time) finally: @@ -407,7 +519,7 @@ class CameraPlugin(Plugin, ABC): try: output.close() except Exception as e: - self.logger.warning('Could not close camera output: {}'.format(str(e))) + self.logger.warning('Could not close camera output: %s', e) self.close_device(camera, wait_capture=False) frame_processor.join(timeout=5.0) @@ -426,17 +538,26 @@ class CameraPlugin(Plugin, ABC): :param camera: An initialized :class:`platypush.plugins.camera.Camera` object. :param preview: Show a preview of the camera frames. """ - assert not (camera.capture_thread and camera.capture_thread.is_alive()), \ - 'A capture session is already in progress' + assert not ( + camera.capture_thread and camera.capture_thread.is_alive() + ), 'A capture session is already in progress' - camera.capture_thread = threading.Thread(target=self.capturing_thread, args=(camera, *args), - kwargs={'preview': preview, **kwargs}) + camera.capture_thread = threading.Thread( + target=self.capturing_thread, + args=(camera, *args), + kwargs={'preview': preview, **kwargs}, + ) camera.capture_thread.start() camera.start_event.set() @action - def capture_video(self, duration: Optional[float] = None, video_file: Optional[str] = None, preview: bool = False, - **camera) -> Union[str, dict]: + def capture_video( + self, + duration: Optional[float] = None, + video_file: Optional[str] = None, + preview: bool = False, + **camera, + ) -> Union[str, dict]: """ Capture a video. @@ -448,8 +569,14 @@ class CameraPlugin(Plugin, ABC): to the recorded resource. Otherwise, it will return the status of the camera device after starting it. """ camera = self.open_device(**camera) - self.start_camera(camera, duration=duration, video_file=video_file, frames_dir=None, image_file=None, - preview=preview) + self.start_camera( + camera, + duration=duration, + video_file=video_file, + frames_dir=None, + image_file=None, + preview=preview, + ) if duration: self.wait_capture(camera) @@ -484,8 +611,12 @@ class CameraPlugin(Plugin, ABC): """ with self.open(**camera) as camera: - warmup_frames = camera.info.warmup_frames if camera.info.warmup_frames else 1 - self.start_camera(camera, image_file=image_file, n_frames=warmup_frames, preview=preview) + warmup_frames = ( + camera.info.warmup_frames if camera.info.warmup_frames else 1 + ) + self.start_camera( + camera, image_file=image_file, n_frames=warmup_frames, preview=preview + ) self.wait_capture(camera) return image_file @@ -504,8 +635,13 @@ class CameraPlugin(Plugin, ABC): return self.capture_image(image_file, **camera) @action - def capture_sequence(self, duration: Optional[float] = None, n_frames: Optional[int] = None, preview: bool = False, - **camera) -> str: + def capture_sequence( + self, + duration: Optional[float] = None, + n_frames: Optional[int] = None, + preview: bool = False, + **camera, + ) -> str: """ Capture a sequence of frames from a camera and store them to a directory. @@ -517,12 +653,16 @@ class CameraPlugin(Plugin, ABC): :return: The directory where the image files have been stored. """ with self.open(**camera) as camera: - self.start_camera(camera, duration=duration, n_frames=n_frames, preview=preview) + self.start_camera( + camera, duration=duration, n_frames=n_frames, preview=preview + ) self.wait_capture(camera) return camera.info.frames_dir @action - def capture_preview(self, duration: Optional[float] = None, n_frames: Optional[int] = None, **camera) -> dict: + def capture_preview( + self, duration: Optional[float] = None, n_frames: Optional[int] = None, **camera + ) -> dict: """ Start a camera preview session. @@ -539,10 +679,12 @@ class CameraPlugin(Plugin, ABC): def _prepare_server_socket(camera: Camera) -> socket.socket: server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - server_socket.bind(( # lgtm [py/bind-socket-all-network-interfaces] - camera.info.bind_address or '0.0.0.0', - camera.info.listen_port - )) + server_socket.bind( + ( # lgtm [py/bind-socket-all-network-interfaces] + camera.info.bind_address or '0.0.0.0', + camera.info.listen_port, + ) + ) server_socket.listen(1) server_socket.settimeout(1) return server_socket @@ -550,16 +692,18 @@ class CameraPlugin(Plugin, ABC): def _accept_client(self, server_socket: socket.socket) -> Optional[IO]: try: sock = server_socket.accept()[0] - self.logger.info('Accepted client connection from {}'.format(sock.getpeername())) + self.logger.info('Accepted client connection from %s', sock.getpeername()) return sock.makefile('wb') except socket.timeout: return - def streaming_thread(self, camera: Camera, stream_format: str, duration: Optional[float] = None): + def streaming_thread( + self, camera: Camera, stream_format: str, duration: Optional[float] = None + ): streaming_started_time = time.time() server_socket = self._prepare_server_socket(camera) sock = None - self.logger.info('Starting streaming on port {}'.format(camera.info.listen_port)) + self.logger.info('Starting streaming on port %s', camera.info.listen_port) try: while camera.stream_event.is_set(): @@ -576,7 +720,9 @@ class CameraPlugin(Plugin, ABC): camera = self.open_device(stream=True, **info) camera.stream.sock = sock - self.start_camera(camera, duration=duration, frames_dir=None, image_file=None) + self.start_camera( + camera, duration=duration, frames_dir=None, image_file=None + ) finally: self._cleanup_stream(camera, server_socket, sock) self.logger.info('Stopped camera stream') @@ -586,21 +732,23 @@ class CameraPlugin(Plugin, ABC): try: client.close() except Exception as e: - self.logger.warning('Error on client socket close: {}'.format(str(e))) + self.logger.warning('Error on client socket close: %s', e) try: server_socket.close() except Exception as e: - self.logger.warning('Error on server socket close: {}'.format(str(e))) + self.logger.warning('Error on server socket close: %s', e) if camera.stream: try: camera.stream.close() except Exception as e: - self.logger.warning('Error while closing the encoding stream: {}'.format(str(e))) + self.logger.warning('Error while closing the encoding stream: %s', e) @action - def start_streaming(self, duration: Optional[float] = None, stream_format: str = 'mkv', **camera) -> dict: + def start_streaming( + self, duration: Optional[float] = None, stream_format: str = 'mkv', **camera + ) -> dict: """ Expose the video stream of a camera over a TCP connection. @@ -612,16 +760,25 @@ class CameraPlugin(Plugin, ABC): camera = self.open_device(stream=True, stream_format=stream_format, **camera) return self._start_streaming(camera, duration, stream_format) - def _start_streaming(self, camera: Camera, duration: Optional[float], stream_format: str): + def _start_streaming( + self, camera: Camera, duration: Optional[float], stream_format: str + ): assert camera.info.listen_port, 'No listen_port specified/configured' - assert not camera.stream_event.is_set() and camera.info.device not in self._streams, \ - 'A streaming session is already running for device {}'.format(camera.info.device) + assert ( + not camera.stream_event.is_set() and camera.info.device not in self._streams + ), f'A streaming session is already running for device {camera.info.device}' self._streams[camera.info.device] = camera camera.stream_event.set() - camera.stream_thread = threading.Thread(target=self.streaming_thread, kwargs=dict( - camera=camera, duration=duration, stream_format=stream_format)) + camera.stream_thread = threading.Thread( + target=self.streaming_thread, + kwargs={ + 'camera': camera, + 'duration': duration, + 'stream_format': stream_format, + }, + ) camera.stream_thread.start() return self.status(camera.info.device) @@ -655,10 +812,17 @@ class CameraPlugin(Plugin, ABC): return { **camera.info.to_dict(), - 'active': True if camera.capture_thread and camera.capture_thread.is_alive() else False, - 'capturing': True if camera.capture_thread and camera.capture_thread.is_alive() and - camera.start_event.is_set() else False, - 'streaming': camera.stream_thread and camera.stream_thread.is_alive() and camera.stream_event.is_set(), + 'active': bool(camera.capture_thread and camera.capture_thread.is_alive()), + 'capturing': bool( + camera.capture_thread + and camera.capture_thread.is_alive() + and camera.start_event.is_set() + ), + 'streaming': ( + camera.stream_thread + and camera.stream_thread.is_alive() + and camera.stream_event.is_set() + ), } @action @@ -670,10 +834,7 @@ class CameraPlugin(Plugin, ABC): if device: return self._status(device) - return { - id: self._status(device) - for id, camera in self._devices.items() - } + return {id: self._status(device) for id, camera in self._devices.items()} @staticmethod def transform_frame(frame, color_transform): @@ -690,6 +851,7 @@ class CameraPlugin(Plugin, ABC): :param frame: Image frame (default: a ``PIL.Image`` object). """ from PIL import ImageOps + return ImageOps.grayscale(frame) @staticmethod @@ -724,7 +886,9 @@ class CameraPlugin(Plugin, ABC): return frame @staticmethod - def scale_frame(frame, scale_x: Optional[float] = None, scale_y: Optional[float] = None): + def scale_frame( + frame, scale_x: Optional[float] = None, scale_y: Optional[float] = None + ): """ Frame scaling logic. The default implementation assumes that frame is a ``PIL.Image`` object. @@ -733,6 +897,7 @@ class CameraPlugin(Plugin, ABC): :param scale_y: Y-scale factor. """ from PIL import Image + if not (scale_x and scale_y) or (scale_x == 1 and scale_y == 1): return frame diff --git a/platypush/plugins/camera/model/writer/__init__.py b/platypush/plugins/camera/model/writer/__init__.py index dbadfa03b0..0fb8592552 100644 --- a/platypush/plugins/camera/model/writer/__init__.py +++ b/platypush/plugins/camera/model/writer/__init__.py @@ -49,7 +49,7 @@ class VideoWriter(ABC): """ return self - def __exit__(self, exc_type, exc_val, exc_tb): + def __exit__(self, *_, **__): """ Context manager-based interface. """ @@ -60,8 +60,9 @@ class FileVideoWriter(VideoWriter, ABC): """ Abstract class to handle frames-to-video file operations. """ + def __init__(self, *args, output_file: str, **kwargs): - VideoWriter.__init__(self, *args, **kwargs) + super().__init__(self, *args, **kwargs) self.output_file = os.path.abspath(os.path.expanduser(output_file)) @@ -69,8 +70,9 @@ class StreamWriter(VideoWriter, ABC): """ Abstract class for camera streaming operations. """ + def __init__(self, *args, sock: Optional[IO] = None, **kwargs): - VideoWriter.__init__(self, *args, **kwargs) + super().__init__(*args, **kwargs) self.frame: Optional[bytes] = None self.frame_time: Optional[float] = None self.buffer = io.BytesIO() @@ -117,16 +119,20 @@ class StreamWriter(VideoWriter, ABC): try: self.sock.close() except Exception as e: - self.logger.warning('Could not close camera resource: {}'.format(str(e))) + self.logger.warning('Could not close camera resource: %s', e) super().close() @staticmethod def get_class_by_name(name: str): from platypush.plugins.camera.model.writer.index import StreamHandlers + name = name.upper() - assert hasattr(StreamHandlers, name), 'No such stream handler: {}. Supported types: {}'.format( - name, [hndl.name for hndl in list(StreamHandlers)]) + assert hasattr( + StreamHandlers, name + ), f'No such stream handler: {name}. Supported types: ' + ( + ', '.join([hndl.name for hndl in list(StreamHandlers)]) + ) return getattr(StreamHandlers, name).value diff --git a/platypush/plugins/camera/model/writer/ffmpeg.py b/platypush/plugins/camera/model/writer/ffmpeg.py index fe0c77502b..41011e3935 100644 --- a/platypush/plugins/camera/model/writer/ffmpeg.py +++ b/platypush/plugins/camera/model/writer/ffmpeg.py @@ -8,7 +8,11 @@ from typing import Optional, Tuple from PIL.Image import Image from platypush.plugins.camera.model.camera import Camera -from platypush.plugins.camera.model.writer import VideoWriter, FileVideoWriter, StreamWriter +from platypush.plugins.camera.model.writer import ( + VideoWriter, + FileVideoWriter, + StreamWriter, +) class FFmpegWriter(VideoWriter, ABC): @@ -16,9 +20,17 @@ class FFmpegWriter(VideoWriter, ABC): Generic FFmpeg encoder for camera frames. """ - def __init__(self, *args, input_file: str = '-', input_format: str = 'rawvideo', output_file: str = '-', - output_format: Optional[str] = None, pix_fmt: Optional[str] = None, - output_opts: Optional[Tuple] = None, **kwargs): + def __init__( + self, + *args, + input_file: str = '-', + input_format: str = 'rawvideo', + output_file: str = '-', + output_format: Optional[str] = None, + pix_fmt: Optional[str] = None, + output_opts: Optional[Tuple] = None, + **kwargs, + ): super().__init__(*args, **kwargs) self.input_file = input_file @@ -29,21 +41,34 @@ class FFmpegWriter(VideoWriter, ABC): self.pix_fmt = pix_fmt self.output_opts = output_opts or () - self.logger.info('Starting FFmpeg. Command: {}'.format(' '.join(self.ffmpeg_args))) - self.ffmpeg = subprocess.Popen(self.ffmpeg_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE) + self.logger.info('Starting FFmpeg. Command: ' + ' '.join(self.ffmpeg_args)) + self.ffmpeg = subprocess.Popen( + self.ffmpeg_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE + ) @property def ffmpeg_args(self): - return [self.camera.info.ffmpeg_bin, '-y', - '-f', self.input_format, - *(('-pix_fmt', self.pix_fmt) if self.pix_fmt else ()), - '-s', '{}x{}'.format(self.width, self.height), - '-r', str(self.camera.info.fps), - '-i', self.input_file, - *(('-f', self.output_format) if self.output_format else ()), - *self.output_opts, - *(('-vcodec', self.camera.info.output_codec) if self.camera.info.output_codec else ()), - self.output_file] + return [ + self.camera.info.ffmpeg_bin, + '-y', + '-f', + self.input_format, + *(('-pix_fmt', self.pix_fmt) if self.pix_fmt else ()), + '-s', + f'{self.width}x{self.height}', + '-r', + str(self.camera.info.fps), + '-i', + self.input_file, + *(('-f', self.output_format) if self.output_format else ()), + *self.output_opts, + *( + ('-vcodec', self.camera.info.output_codec) + if self.camera.info.output_codec + else () + ), + self.output_file, + ] def is_closed(self): return self.closed or not self.ffmpeg or self.ffmpeg.poll() is not None @@ -55,7 +80,7 @@ class FFmpegWriter(VideoWriter, ABC): try: self.ffmpeg.stdin.write(image.convert('RGB').tobytes()) except Exception as e: - self.logger.warning('FFmpeg send error: {}'.format(str(e))) + self.logger.warning('FFmpeg send error: %s', e) self.close() def close(self): @@ -63,7 +88,7 @@ class FFmpegWriter(VideoWriter, ABC): if self.ffmpeg and self.ffmpeg.stdin: try: self.ffmpeg.stdin.close() - except (IOError, OSError): + except OSError: pass if self.ffmpeg: @@ -77,7 +102,7 @@ class FFmpegWriter(VideoWriter, ABC): if self.ffmpeg and self.ffmpeg.stdout: try: self.ffmpeg.stdout.close() - except (IOError, OSError): + except OSError: pass self.ffmpeg = None @@ -98,10 +123,26 @@ class FFmpegStreamWriter(StreamWriter, FFmpegWriter, ABC): Stream camera frames using FFmpeg. """ - def __init__(self, *args, output_format: str, output_opts: Optional[Tuple] = None, **kwargs): - super().__init__(*args, pix_fmt='rgb24', output_format=output_format, output_opts=output_opts or ( - '-tune', 'zerolatency', '-preset', 'superfast', '-trellis', '0', - '-fflags', 'nobuffer'), **kwargs) + def __init__( + self, *args, output_format: str, output_opts: Optional[Tuple] = None, **kwargs + ): + super().__init__( + *args, + pix_fmt='rgb24', + output_format=output_format, + output_opts=output_opts + or ( + '-tune', + 'zerolatency', + '-preset', + 'superfast', + '-trellis', + '0', + '-fflags', + 'nobuffer', + ), + **kwargs, + ) self._reader = threading.Thread(target=self._reader_thread) self._reader.start() @@ -115,7 +156,7 @@ class FFmpegStreamWriter(StreamWriter, FFmpegWriter, ABC): try: data = self.ffmpeg.stdout.read(1 << 15) except Exception as e: - self.logger.warning('FFmpeg reader error: {}'.format(str(e))) + self.logger.warning('FFmpeg reader error: %s', e) break if not data: @@ -123,7 +164,7 @@ class FFmpegStreamWriter(StreamWriter, FFmpegWriter, ABC): if self.frame is None: latency = time.time() - start_time - self.logger.info('FFmpeg stream latency: {} secs'.format(latency)) + self.logger.info('FFmpeg stream latency: %d secs', latency) with self.ready: self.frame = data @@ -140,12 +181,16 @@ class FFmpegStreamWriter(StreamWriter, FFmpegWriter, ABC): try: self.ffmpeg.stdin.write(data) except Exception as e: - self.logger.warning('FFmpeg send error: {}'.format(str(e))) + self.logger.warning('FFmpeg send error: %s', e) self.close() def close(self): super().close() - if self._reader and self._reader.is_alive() and threading.get_ident() != self._reader.ident: + if ( + self._reader + and self._reader.is_alive() + and threading.get_ident() != self._reader.ident + ): self._reader.join(timeout=5.0) self._reader = None