Added Android IPCam support

This commit is contained in:
Fabio Manganiello 2019-12-17 00:56:28 +01:00
parent 0cef9c6070
commit 66d00ee428
5 changed files with 457 additions and 0 deletions

View File

@ -0,0 +1,8 @@
from platypush.message.response import Response
class CameraResponse(Response):
pass
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,180 @@
from typing import List
from platypush.message.response.camera import CameraResponse
class AndroidCameraStatusResponse(CameraResponse):
"""
Sample response::
..code-block:: json
{
"orientation": "landscape",
"idle": "off",
"audio_only": "off",
"overlay": "off",
"quality": "49",
"focus_homing": "off",
"ip_address": "192.168.1.30",
"motion_limit": "250",
"adet_limit": "200",
"night_vision": "off",
"night_vision_average": "2",
"night_vision_gain": "1.0",
"motion_detect": "off",
"motion_display": "off",
"video_chunk_len": "60",
"gps_active": "off",
"video_size": "1920x1080",
"mirror_flip": "none",
"ffc": "off",
"rtsp_video_formats": "",
"rtsp_audio_formats": "",
"video_connections": "0",
"audio_connections": "0",
"ivideon_streaming": "off",
"zoom": "100",
"crop_x": "50",
"crop_y": "50",
"coloreffect": "none",
"scenemode": "auto",
"focusmode": "continuous-video",
"whitebalance": "auto",
"flashmode": "off",
"antibanding": "off",
"torch": "off",
"focus_distance": "0.0",
"focal_length": "4.25",
"aperture": "1.7",
"filter_density": "0.0",
"exposure_ns": "9384",
"frame_duration": "33333333",
"iso": "100",
"manual_sensor": "off",
"photo_size": "1920x1080"
}
"""
attrs = {
'orientation', 'idle', 'audio_only', 'overlay', 'quality', 'focus_homing',
'ip_address', 'motion_limit', 'adet_limit', 'night_vision',
'night_vision_average', 'night_vision_gain', 'motion_detect',
'motion_display', 'video_chunk_len', 'gps_active', 'video_size',
'mirror_flip', 'ffc', 'rtsp_video_formats', 'rtsp_audio_formats',
'video_connections', 'audio_connections', 'ivideon_streaming', 'zoom',
'crop_x', 'crop_y', 'coloreffect', 'scenemode', 'focusmode',
'whitebalance', 'flashmode', 'antibanding', 'torch', 'focus_distance',
'focal_length', 'aperture', 'filter_density', 'exposure_ns',
'frame_duration', 'iso', 'manual_sensor', 'photo_size',
}
def __init__(self, *args,
orientation: str = None,
idle: str = None,
audio_only: str = None,
overlay: str = None,
quality: str = None,
focus_homing: str = None,
ip_address: str = None,
motion_limit: str = None,
adet_limit: str = None,
night_vision: str = None,
night_vision_average: str = None,
night_vision_gain: str = None,
motion_detect: str = None,
motion_display: str = None,
video_chunk_len: str = None,
gps_active: str = None,
video_size: str = None,
mirror_flip: str = None,
ffc: str = None,
rtsp_video_formats: str = None,
rtsp_audio_formats: str = None,
video_connections: str = None,
audio_connections: str = None,
ivideon_streaming: str = None,
zoom: str = None,
crop_x: str = None,
crop_y: str = None,
coloreffect: str = None,
scenemode: str = None,
focusmode: str = None,
whitebalance: str = None,
flashmode: str = None,
antibanding: str = None,
torch: str = None,
focus_distance: str = None,
focal_length: str = None,
aperture: str = None,
filter_density: str = None,
exposure_ns: str = None,
frame_duration: str = None,
iso: str = None,
manual_sensor: str = None,
photo_size: str = None,
**kwargs):
self.status = {
"orientation": orientation,
"idle": True if idle == "on" else False,
"audio_only": True if audio_only == "on" else False,
"overlay": True if overlay == "on" else False,
"quality": int(quality or 0),
"focus_homing": True if focus_homing == "on" else False,
"ip_address": ip_address,
"motion_limit": int(motion_limit or 0),
"adet_limit": int(adet_limit or 0),
"night_vision": True if night_vision == "on" else False,
"night_vision_average": float(night_vision_average or 0),
"night_vision_gain": float(night_vision_gain or 0),
"motion_detect": True if motion_detect == "on" else False,
"motion_display": True if motion_display == "on" else False,
"video_chunk_len": int(video_chunk_len or 0),
"gps_active": True if gps_active == "on" else False,
"video_size": video_size,
"mirror_flip": mirror_flip,
"ffc": True if ffc == "on" else False,
"rtsp_video_formats": rtsp_video_formats,
"rtsp_audio_formats": rtsp_audio_formats,
"video_connections": int(video_connections or 0),
"audio_connections": int(audio_connections or 0),
"ivideon_streaming": True if ivideon_streaming == "on" else False,
"zoom": int(zoom or 0),
"crop_x": int(crop_x or 0),
"crop_y": int(crop_y or 0),
"coloreffect": coloreffect,
"scenemode": scenemode,
"focusmode": focusmode,
"whitebalance": whitebalance,
"flashmode": True if flashmode == "on" else False,
"antibanding": True if antibanding == "on" else False,
"torch": True if torch == "on" else False,
"focus_distance": float(focus_distance or 0),
"focal_length": float(focal_length or 0),
"aperture": float(aperture or 0),
"filter_density": float(filter_density or 0),
"exposure_ns": int(exposure_ns or 0),
"frame_duration": int(frame_duration or 0),
"iso": int(iso or 0),
"manual_sensor": True if manual_sensor == "on" else False,
"photo_size": photo_size,
}
super().__init__(*args, output=self.status, **kwargs)
class AndroidCameraStatusListResponse(CameraResponse):
def __init__(self, status: List[AndroidCameraStatusResponse], **kwargs):
self.status = [s.status for s in status]
super().__init__(output=self.status, **kwargs)
class AndroidCameraPictureResponse(CameraResponse):
def __init__(self, image_file: str, *args, **kwargs):
self.image_file = image_file
super().__init__(*args, output={'image_file': image_file}, **kwargs)
# vim:sw=4:ts=4:et:

View File

@ -6,6 +6,7 @@ from platypush.event import EventGenerator
from platypush.message.response import Response
from platypush.utils import get_decorators
def action(f):
@wraps(f)
def _execute_action(*args, **kwargs):

View File

@ -0,0 +1,268 @@
import json
import os
import requests
from requests.auth import HTTPBasicAuth
from typing import Optional, Union, Dict, List, Any
from platypush.message.response.camera.android import AndroidCameraStatusResponse, AndroidCameraStatusListResponse, \
AndroidCameraPictureResponse
from platypush.plugins import Plugin, action
class AndroidIpcam:
args = {}
def __init__(self, name: str, host: str, port: int = 8080, username: Optional[str] = None,
password: Optional[str] = None, timeout: int = 10, ssl: bool = True):
self.args = {
'name': name,
'host': host,
'port': port,
'username': username,
'password': password,
'timeout': timeout,
'ssl': ssl,
}
self.auth = None
if username:
self.auth = HTTPBasicAuth(self.username, self.password)
def __getattr__(self, item):
if item in self.args:
return super().__getattribute__('args').get(item)
return super().__getattribute__(item)
def __setattr__(self, key, value):
if key not in self.args:
super().__setattr__(key, value)
else:
self.args[key] = value
def __str__(self):
return json.dumps(getattr(self, 'args') or {})
@property
def base_url(self) -> str:
return 'http{ssl}://{host}:{port}/'.format(
ssl=('s' if self.ssl else ''), host=self.host, port=self.port)
@property
def stream_url(self) -> str:
return self.base_url + 'video'
@property
def audio_url(self) -> str:
return self.base_url + 'audio.wav'
@property
def image_url(self) -> str:
return self.base_url + 'photo.jpg'
class CameraAndroidIpcamPlugin(Plugin):
"""
Plugin to control remote Android cameras over
`IPCam <https://play.google.com/store/apps/details?id=com.pas.webcam>`_.
"""
def __init__(self,
host: Optional[str] = None,
port: Optional[int] = 8080,
username: Optional[str] = None,
password: Optional[str] = None,
timeout: int = 10,
ssl: bool = True,
cameras: Optional[Dict[str, Dict[str, Any]]] = None,
**kwargs):
"""
:param host: Camera host name or address
:param port: Camera port
:param username: Camera username, if set
:param password: Camera password, if set
:param timeout: Connection timeout
:param ssl: Use HTTPS instead of HTTP
:param cameras: Alternatively, you can specify a list of IPCam cameras as a
name->dict mapping. The keys will be unique names used to identify your
cameras, the values will contain dictionaries containing `host, `port`,
`username`, `password`, `timeout` and `ssl` attributes for each camera.
"""
super().__init__(**kwargs)
self.cameras: List[AndroidIpcam] = []
self._camera_name_to_idx: Dict[str, int] = {}
if not cameras:
camera = AndroidIpcam(name=host, host=host, port=port, username=username,
password=password, timeout=timeout, ssl=ssl)
self.cameras.append(camera)
self._camera_name_to_idx[host] = 0
else:
for name, camera in cameras.items():
camera = AndroidIpcam(name=name, host=camera['host'], port=camera.get('port', port),
username=camera.get('username'), password=camera.get('password'),
timeout=camera.get('timeout', timeout), ssl=camera.get('ssl', ssl))
self._camera_name_to_idx[name] = len(self.cameras)
self.cameras.append(camera)
def _get_camera(self, camera: Union[int, str] = None) -> AndroidIpcam:
if not camera:
camera = 0
if isinstance(camera, int):
return self.cameras[camera]
return self.cameras[self._camera_name_to_idx[camera]]
def _exec(self, url: str, camera: Union[int, str] = None, *args, **kwargs) -> Union[Dict[str, Any], bool]:
cam = self._get_camera(camera)
url = cam.base_url + url
response = requests.get(url, auth=cam.auth, timeout=cam.timeout, verify=False, *args, **kwargs)
response.raise_for_status()
if response.headers.get('content-type') == 'application/json':
return response.json()
return response.text.find('Ok') != -1
@action
def change_setting(self, key: str, value: Union[str, int, bool], camera: Union[int, str] = None) -> bool:
"""
Change a setting.
:param key: Setting name
:param value: Setting value
:param camera: Camera index or configured name
:return: True on success, False otherwise
"""
if isinstance(value, bool):
payload = "on" if value else "off"
else:
payload = value
return self._exec("settings/{key}?set={payload}".format(key=key, payload=payload), camera=camera)
@action
def status(self, camera: Union[int, str] = None) -> AndroidCameraStatusListResponse:
"""
:param camera: Camera index or name (default: status of all the cameras)
:return: True if the camera is available, False otherwise
"""
cameras = self._camera_name_to_idx.keys() if camera is None else [camera]
status = []
for cam in cameras:
try:
status_data = self._exec('status.json', params={'show_avail': 1}, camera=cam).get('curvals', {})
status.append(AndroidCameraStatusResponse(
**{k: v for k, v in status_data.items()
if k in AndroidCameraStatusResponse.attrs})
)
except Exception as e:
self.logger.warning('Could not get the status of {}: {}'.format(cam, str(e)))
return AndroidCameraStatusListResponse(status)
@action
def set_front_facing_camera(self, activate: bool = True, camera: Union[int, str] = None) -> bool:
"""Enable/disable the front-facing camera."""
return self.change_setting('ffc', activate, camera=camera)
@action
def set_torch(self, activate: bool = True, camera: Union[int, str] = None) -> bool:
"""Enable/disable the torch."""
url = 'enabletorch' if activate else 'disabletorch'
return self._exec(url, camera=camera)
@action
def set_focus(self, activate: bool = True, camera: Union[int, str] = None) -> bool:
"""Enable/disable the focus."""
url = 'focus' if activate else 'nofocus'
return self._exec(url, camera=camera)
@action
def start_recording(self, tag: Optional[str] = None, camera: Union[int, str] = None) -> bool:
"""Start recording."""
params = {'force': 1}
if tag:
params['tag'] = tag
return self._exec('startvideo', params=params, camera=camera)
@action
def stop_recording(self, camera: Union[int, str] = None) -> bool:
"""Stop recording."""
return self._exec('stopvideo', params={'force': 1}, camera=camera)
@action
def take_picture(self, image_file: str, camera: Union[int, str] = None) -> AndroidCameraPictureResponse:
"""Take a picture and save it on the local device."""
cam = self._get_camera(camera)
image_file = os.path.abspath(os.path.expanduser(image_file))
os.makedirs(os.path.dirname(image_file), exist_ok=True)
response = requests.get(cam.image_url, auth=cam.auth, verify=False)
response.raise_for_status()
with open(image_file, 'wb') as f:
f.write(response.content)
return AndroidCameraPictureResponse(image_file=image_file)
@action
def set_night_vision(self, activate: bool = True, camera: Union[int, str] = None) -> bool:
"""Enable/disable night vision."""
return self.change_setting('night_vision', activate, camera=camera)
@action
def set_overlay(self, activate: bool = True, camera: Union[int, str] = None) -> bool:
"""Enable/disable video overlay."""
return self.change_setting('overlay', activate, camera=camera)
@action
def set_gps(self, activate: bool = True, camera: Union[int, str] = None) -> bool:
"""Enable/disable GPS."""
return self.change_setting('gps_active', activate, camera=camera)
@action
def set_quality(self, quality: int = 100, camera: Union[int, str] = None) -> bool:
"""Set video quality."""
return self.change_setting('quality', int(quality), camera=camera)
@action
def set_motion_detect(self, activate: bool = True, camera: Union[int, str] = None) -> bool:
"""Enable/disable motion detect."""
return self.change_setting('motion_detect', activate, camera=camera)
@action
def set_orientation(self, orientation: str = 'landscape', camera: Union[int, str] = None) -> bool:
"""Set video orientation."""
return self.change_setting('orientation', orientation, camera=camera)
@action
def set_zoom(self, zoom: float, camera: Union[int, str] = None) -> bool:
"""Set the zoom level."""
return self._exec('settings/ptz', params={'zoom': float(zoom)}, camera=camera)
@action
def set_scenemode(self, scenemode: str = 'auto', camera: Union[int, str] = None) -> bool:
"""Set video orientation."""
return self.change_setting('scenemode', scenemode, camera=camera)
@action
def get_stream_url(self, camera: Union[int, str] = None) -> str:
""" Get the streaming URL for the specified camera. """
cam = self._get_camera(camera)
return cam.stream_url
@action
def get_image_url(self, camera: Union[int, str] = None) -> str:
""" Get the URL to the camera static picture. """
cam = self._get_camera(camera)
return cam.image_url
@action
def get_audio_url(self, camera: Union[int, str] = None) -> str:
""" Get the URL to the camera audio source. """
cam = self._get_camera(camera)
return cam.audio_url
# vim:sw=4:ts=4:et: