diff --git a/platypush/message/response/camera/__init__.py b/platypush/message/response/camera/__init__.py new file mode 100644 index 000000000..f584452f9 --- /dev/null +++ b/platypush/message/response/camera/__init__.py @@ -0,0 +1,8 @@ +from platypush.message.response import Response + + +class CameraResponse(Response): + pass + + +# vim:sw=4:ts=4:et: diff --git a/platypush/message/response/camera/android.py b/platypush/message/response/camera/android.py new file mode 100644 index 000000000..1d0481340 --- /dev/null +++ b/platypush/message/response/camera/android.py @@ -0,0 +1,180 @@ +from typing import List + +from platypush.message.response.camera import CameraResponse + + +class AndroidCameraStatusResponse(CameraResponse): + """ + Sample response:: + + ..code-block:: json + + { + "orientation": "landscape", + "idle": "off", + "audio_only": "off", + "overlay": "off", + "quality": "49", + "focus_homing": "off", + "ip_address": "192.168.1.30", + "motion_limit": "250", + "adet_limit": "200", + "night_vision": "off", + "night_vision_average": "2", + "night_vision_gain": "1.0", + "motion_detect": "off", + "motion_display": "off", + "video_chunk_len": "60", + "gps_active": "off", + "video_size": "1920x1080", + "mirror_flip": "none", + "ffc": "off", + "rtsp_video_formats": "", + "rtsp_audio_formats": "", + "video_connections": "0", + "audio_connections": "0", + "ivideon_streaming": "off", + "zoom": "100", + "crop_x": "50", + "crop_y": "50", + "coloreffect": "none", + "scenemode": "auto", + "focusmode": "continuous-video", + "whitebalance": "auto", + "flashmode": "off", + "antibanding": "off", + "torch": "off", + "focus_distance": "0.0", + "focal_length": "4.25", + "aperture": "1.7", + "filter_density": "0.0", + "exposure_ns": "9384", + "frame_duration": "33333333", + "iso": "100", + "manual_sensor": "off", + "photo_size": "1920x1080" + } + + """ + + attrs = { + 'orientation', 'idle', 'audio_only', 'overlay', 'quality', 'focus_homing', + 'ip_address', 'motion_limit', 'adet_limit', 'night_vision', + 'night_vision_average', 'night_vision_gain', 'motion_detect', + 'motion_display', 'video_chunk_len', 'gps_active', 'video_size', + 'mirror_flip', 'ffc', 'rtsp_video_formats', 'rtsp_audio_formats', + 'video_connections', 'audio_connections', 'ivideon_streaming', 'zoom', + 'crop_x', 'crop_y', 'coloreffect', 'scenemode', 'focusmode', + 'whitebalance', 'flashmode', 'antibanding', 'torch', 'focus_distance', + 'focal_length', 'aperture', 'filter_density', 'exposure_ns', + 'frame_duration', 'iso', 'manual_sensor', 'photo_size', + } + + def __init__(self, *args, + orientation: str = None, + idle: str = None, + audio_only: str = None, + overlay: str = None, + quality: str = None, + focus_homing: str = None, + ip_address: str = None, + motion_limit: str = None, + adet_limit: str = None, + night_vision: str = None, + night_vision_average: str = None, + night_vision_gain: str = None, + motion_detect: str = None, + motion_display: str = None, + video_chunk_len: str = None, + gps_active: str = None, + video_size: str = None, + mirror_flip: str = None, + ffc: str = None, + rtsp_video_formats: str = None, + rtsp_audio_formats: str = None, + video_connections: str = None, + audio_connections: str = None, + ivideon_streaming: str = None, + zoom: str = None, + crop_x: str = None, + crop_y: str = None, + coloreffect: str = None, + scenemode: str = None, + focusmode: str = None, + whitebalance: str = None, + flashmode: str = None, + antibanding: str = None, + torch: str = None, + focus_distance: str = None, + focal_length: str = None, + aperture: str = None, + filter_density: str = None, + exposure_ns: str = None, + frame_duration: str = None, + iso: str = None, + manual_sensor: str = None, + photo_size: str = None, + **kwargs): + + self.status = { + "orientation": orientation, + "idle": True if idle == "on" else False, + "audio_only": True if audio_only == "on" else False, + "overlay": True if overlay == "on" else False, + "quality": int(quality or 0), + "focus_homing": True if focus_homing == "on" else False, + "ip_address": ip_address, + "motion_limit": int(motion_limit or 0), + "adet_limit": int(adet_limit or 0), + "night_vision": True if night_vision == "on" else False, + "night_vision_average": float(night_vision_average or 0), + "night_vision_gain": float(night_vision_gain or 0), + "motion_detect": True if motion_detect == "on" else False, + "motion_display": True if motion_display == "on" else False, + "video_chunk_len": int(video_chunk_len or 0), + "gps_active": True if gps_active == "on" else False, + "video_size": video_size, + "mirror_flip": mirror_flip, + "ffc": True if ffc == "on" else False, + "rtsp_video_formats": rtsp_video_formats, + "rtsp_audio_formats": rtsp_audio_formats, + "video_connections": int(video_connections or 0), + "audio_connections": int(audio_connections or 0), + "ivideon_streaming": True if ivideon_streaming == "on" else False, + "zoom": int(zoom or 0), + "crop_x": int(crop_x or 0), + "crop_y": int(crop_y or 0), + "coloreffect": coloreffect, + "scenemode": scenemode, + "focusmode": focusmode, + "whitebalance": whitebalance, + "flashmode": True if flashmode == "on" else False, + "antibanding": True if antibanding == "on" else False, + "torch": True if torch == "on" else False, + "focus_distance": float(focus_distance or 0), + "focal_length": float(focal_length or 0), + "aperture": float(aperture or 0), + "filter_density": float(filter_density or 0), + "exposure_ns": int(exposure_ns or 0), + "frame_duration": int(frame_duration or 0), + "iso": int(iso or 0), + "manual_sensor": True if manual_sensor == "on" else False, + "photo_size": photo_size, + } + + super().__init__(*args, output=self.status, **kwargs) + + +class AndroidCameraStatusListResponse(CameraResponse): + def __init__(self, status: List[AndroidCameraStatusResponse], **kwargs): + self.status = [s.status for s in status] + super().__init__(output=self.status, **kwargs) + + +class AndroidCameraPictureResponse(CameraResponse): + def __init__(self, image_file: str, *args, **kwargs): + self.image_file = image_file + super().__init__(*args, output={'image_file': image_file}, **kwargs) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/__init__.py b/platypush/plugins/__init__.py index 2ac59461d..470507c43 100644 --- a/platypush/plugins/__init__.py +++ b/platypush/plugins/__init__.py @@ -6,6 +6,7 @@ from platypush.event import EventGenerator from platypush.message.response import Response from platypush.utils import get_decorators + def action(f): @wraps(f) def _execute_action(*args, **kwargs): diff --git a/platypush/plugins/camera/android/__init__.py b/platypush/plugins/camera/android/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/platypush/plugins/camera/android/ipcam.py b/platypush/plugins/camera/android/ipcam.py new file mode 100644 index 000000000..5d812da97 --- /dev/null +++ b/platypush/plugins/camera/android/ipcam.py @@ -0,0 +1,268 @@ +import json +import os +import requests + +from requests.auth import HTTPBasicAuth +from typing import Optional, Union, Dict, List, Any + +from platypush.message.response.camera.android import AndroidCameraStatusResponse, AndroidCameraStatusListResponse, \ + AndroidCameraPictureResponse +from platypush.plugins import Plugin, action + + +class AndroidIpcam: + args = {} + + def __init__(self, name: str, host: str, port: int = 8080, username: Optional[str] = None, + password: Optional[str] = None, timeout: int = 10, ssl: bool = True): + self.args = { + 'name': name, + 'host': host, + 'port': port, + 'username': username, + 'password': password, + 'timeout': timeout, + 'ssl': ssl, + } + + self.auth = None + if username: + self.auth = HTTPBasicAuth(self.username, self.password) + + def __getattr__(self, item): + if item in self.args: + return super().__getattribute__('args').get(item) + return super().__getattribute__(item) + + def __setattr__(self, key, value): + if key not in self.args: + super().__setattr__(key, value) + else: + self.args[key] = value + + def __str__(self): + return json.dumps(getattr(self, 'args') or {}) + + @property + def base_url(self) -> str: + return 'http{ssl}://{host}:{port}/'.format( + ssl=('s' if self.ssl else ''), host=self.host, port=self.port) + + @property + def stream_url(self) -> str: + return self.base_url + 'video' + + @property + def audio_url(self) -> str: + return self.base_url + 'audio.wav' + + @property + def image_url(self) -> str: + return self.base_url + 'photo.jpg' + + +class CameraAndroidIpcamPlugin(Plugin): + """ + Plugin to control remote Android cameras over + `IPCam `_. + """ + + def __init__(self, + host: Optional[str] = None, + port: Optional[int] = 8080, + username: Optional[str] = None, + password: Optional[str] = None, + timeout: int = 10, + ssl: bool = True, + cameras: Optional[Dict[str, Dict[str, Any]]] = None, + **kwargs): + """ + :param host: Camera host name or address + :param port: Camera port + :param username: Camera username, if set + :param password: Camera password, if set + :param timeout: Connection timeout + :param ssl: Use HTTPS instead of HTTP + :param cameras: Alternatively, you can specify a list of IPCam cameras as a + name->dict mapping. The keys will be unique names used to identify your + cameras, the values will contain dictionaries containing `host, `port`, + `username`, `password`, `timeout` and `ssl` attributes for each camera. + """ + super().__init__(**kwargs) + self.cameras: List[AndroidIpcam] = [] + self._camera_name_to_idx: Dict[str, int] = {} + + if not cameras: + camera = AndroidIpcam(name=host, host=host, port=port, username=username, + password=password, timeout=timeout, ssl=ssl) + self.cameras.append(camera) + self._camera_name_to_idx[host] = 0 + else: + for name, camera in cameras.items(): + camera = AndroidIpcam(name=name, host=camera['host'], port=camera.get('port', port), + username=camera.get('username'), password=camera.get('password'), + timeout=camera.get('timeout', timeout), ssl=camera.get('ssl', ssl)) + self._camera_name_to_idx[name] = len(self.cameras) + self.cameras.append(camera) + + def _get_camera(self, camera: Union[int, str] = None) -> AndroidIpcam: + if not camera: + camera = 0 + + if isinstance(camera, int): + return self.cameras[camera] + + return self.cameras[self._camera_name_to_idx[camera]] + + def _exec(self, url: str, camera: Union[int, str] = None, *args, **kwargs) -> Union[Dict[str, Any], bool]: + cam = self._get_camera(camera) + url = cam.base_url + url + response = requests.get(url, auth=cam.auth, timeout=cam.timeout, verify=False, *args, **kwargs) + response.raise_for_status() + + if response.headers.get('content-type') == 'application/json': + return response.json() + + return response.text.find('Ok') != -1 + + @action + def change_setting(self, key: str, value: Union[str, int, bool], camera: Union[int, str] = None) -> bool: + """ + Change a setting. + :param key: Setting name + :param value: Setting value + :param camera: Camera index or configured name + :return: True on success, False otherwise + """ + if isinstance(value, bool): + payload = "on" if value else "off" + else: + payload = value + + return self._exec("settings/{key}?set={payload}".format(key=key, payload=payload), camera=camera) + + @action + def status(self, camera: Union[int, str] = None) -> AndroidCameraStatusListResponse: + """ + :param camera: Camera index or name (default: status of all the cameras) + :return: True if the camera is available, False otherwise + """ + cameras = self._camera_name_to_idx.keys() if camera is None else [camera] + status = [] + + for cam in cameras: + try: + status_data = self._exec('status.json', params={'show_avail': 1}, camera=cam).get('curvals', {}) + status.append(AndroidCameraStatusResponse( + **{k: v for k, v in status_data.items() + if k in AndroidCameraStatusResponse.attrs}) + ) + except Exception as e: + self.logger.warning('Could not get the status of {}: {}'.format(cam, str(e))) + + return AndroidCameraStatusListResponse(status) + + @action + def set_front_facing_camera(self, activate: bool = True, camera: Union[int, str] = None) -> bool: + """Enable/disable the front-facing camera.""" + return self.change_setting('ffc', activate, camera=camera) + + @action + def set_torch(self, activate: bool = True, camera: Union[int, str] = None) -> bool: + """Enable/disable the torch.""" + url = 'enabletorch' if activate else 'disabletorch' + return self._exec(url, camera=camera) + + @action + def set_focus(self, activate: bool = True, camera: Union[int, str] = None) -> bool: + """Enable/disable the focus.""" + url = 'focus' if activate else 'nofocus' + return self._exec(url, camera=camera) + + @action + def start_recording(self, tag: Optional[str] = None, camera: Union[int, str] = None) -> bool: + """Start recording.""" + params = {'force': 1} + if tag: + params['tag'] = tag + + return self._exec('startvideo', params=params, camera=camera) + + @action + def stop_recording(self, camera: Union[int, str] = None) -> bool: + """Stop recording.""" + return self._exec('stopvideo', params={'force': 1}, camera=camera) + + @action + def take_picture(self, image_file: str, camera: Union[int, str] = None) -> AndroidCameraPictureResponse: + """Take a picture and save it on the local device.""" + cam = self._get_camera(camera) + image_file = os.path.abspath(os.path.expanduser(image_file)) + os.makedirs(os.path.dirname(image_file), exist_ok=True) + response = requests.get(cam.image_url, auth=cam.auth, verify=False) + response.raise_for_status() + + with open(image_file, 'wb') as f: + f.write(response.content) + return AndroidCameraPictureResponse(image_file=image_file) + + @action + def set_night_vision(self, activate: bool = True, camera: Union[int, str] = None) -> bool: + """Enable/disable night vision.""" + return self.change_setting('night_vision', activate, camera=camera) + + @action + def set_overlay(self, activate: bool = True, camera: Union[int, str] = None) -> bool: + """Enable/disable video overlay.""" + return self.change_setting('overlay', activate, camera=camera) + + @action + def set_gps(self, activate: bool = True, camera: Union[int, str] = None) -> bool: + """Enable/disable GPS.""" + return self.change_setting('gps_active', activate, camera=camera) + + @action + def set_quality(self, quality: int = 100, camera: Union[int, str] = None) -> bool: + """Set video quality.""" + return self.change_setting('quality', int(quality), camera=camera) + + @action + def set_motion_detect(self, activate: bool = True, camera: Union[int, str] = None) -> bool: + """Enable/disable motion detect.""" + return self.change_setting('motion_detect', activate, camera=camera) + + @action + def set_orientation(self, orientation: str = 'landscape', camera: Union[int, str] = None) -> bool: + """Set video orientation.""" + return self.change_setting('orientation', orientation, camera=camera) + + @action + def set_zoom(self, zoom: float, camera: Union[int, str] = None) -> bool: + """Set the zoom level.""" + return self._exec('settings/ptz', params={'zoom': float(zoom)}, camera=camera) + + @action + def set_scenemode(self, scenemode: str = 'auto', camera: Union[int, str] = None) -> bool: + """Set video orientation.""" + return self.change_setting('scenemode', scenemode, camera=camera) + + @action + def get_stream_url(self, camera: Union[int, str] = None) -> str: + """ Get the streaming URL for the specified camera. """ + cam = self._get_camera(camera) + return cam.stream_url + + @action + def get_image_url(self, camera: Union[int, str] = None) -> str: + """ Get the URL to the camera static picture. """ + cam = self._get_camera(camera) + return cam.image_url + + @action + def get_audio_url(self, camera: Union[int, str] = None) -> str: + """ Get the URL to the camera audio source. """ + cam = self._get_camera(camera) + return cam.audio_url + + +# vim:sw=4:ts=4:et: