platypush/platypush/plugins/camera/model/camera.py

93 lines
2.9 KiB
Python

import math
import threading
from dataclasses import asdict, dataclass
from typing import Optional, Union, Tuple, Set
import numpy as np
from platypush.plugins.camera.model.writer import (
StreamWriter,
VideoWriter,
FileVideoWriter,
)
from platypush.plugins.camera.model.writer.preview import PreviewWriter
@dataclass
class CameraInfo:
device: Optional[Union[int, str]]
bind_address: Optional[str] = None
capture_timeout: float = 0
color_transform: Optional[Union[int, str]] = None
ffmpeg_bin: Optional[str] = None
fps: Optional[float] = None
frames_dir: Optional[str] = None
grayscale: Optional[bool] = None
horizontal_flip: bool = False
input_codec: Optional[str] = None
input_format: Optional[str] = None
listen_port: Optional[int] = None
output_codec: Optional[str] = None
output_format: Optional[str] = None
resolution: Optional[Tuple[int, int]] = None
rotate: Optional[float] = None
scale_x: Optional[float] = None
scale_y: Optional[float] = None
stream_format: Optional[str] = None
vertical_flip: bool = False
warmup_frames: int = 0
warmup_seconds: float = 0
def set(self, **kwargs):
for k, v in kwargs.items():
if hasattr(self, k):
setattr(self, k, v)
def clone(self):
return self.__class__(**asdict(self))
@dataclass
class Camera:
info: CameraInfo
start_event: threading.Event = threading.Event()
stream_event: threading.Event = threading.Event()
capture_thread: Optional[threading.Thread] = None
stream_thread: Optional[threading.Thread] = None
object = None
stream: Optional[StreamWriter] = None
preview: Optional[PreviewWriter] = None
file_writer: Optional[FileVideoWriter] = None
def get_outputs(self) -> Set[VideoWriter]:
writers = set()
# if self.preview and self.preview.is_alive():
if self.preview and not self.preview.closed:
writers.add(self.preview)
if self.stream and not self.stream.closed:
writers.add(self.stream)
if self.file_writer and not self.file_writer.closed:
writers.add(self.file_writer)
return writers
def effective_resolution(self) -> Tuple[int, int]:
"""
Calculates the effective resolution of the camera in pixels, taking
into account the base resolution, the scale and the rotation.
"""
assert self.info.resolution, 'No base resolution specified'
rot = (self.info.rotate or 0) * math.pi / 180
sin = math.sin(rot)
cos = math.cos(rot)
scale = np.array([[self.info.scale_x or 1.0, self.info.scale_y or 1.0]])
resolution = np.array([[self.info.resolution[0], self.info.resolution[1]]])
rot_matrix = np.array([[sin, cos], [cos, sin]])
resolution = (scale * abs(np.cross(rot_matrix, resolution)))[0]
return int(round(resolution[0])), int(round(resolution[1]))
# vim:sw=4:ts=4:et: