platypush/platypush/plugins/sensor/distance/vl53l1x/__init__.py

176 lines
5.1 KiB
Python

from contextlib import contextmanager
from threading import RLock
from typing import List, Mapping
from typing_extensions import override
from platypush.context import get_bus
from platypush.entities.managers.sensors import SensorEntityManager
from platypush.entities.devices import Device
from platypush.entities.distance import DistanceSensor
from platypush.message.event.sensor import SensorDataChangeEvent
from platypush.plugins import RunnablePlugin, action
from platypush.utils import get_plugin_name_by_class
class SensorDistanceVl53l1xPlugin(RunnablePlugin, SensorEntityManager):
"""
Plugin to interact with an `VL53L1x
<https://www.st.com/en/imaging-and-photonics-solutions/vl53l1x.html>`_
laser ranger/distance sensor
Requires:
* ``smbus2`` (``pip install smbus2``)
* ``vl53l1x`` (``pip install vl53l1x``)
Triggers:
* :class:`platypush.message.event.sensor.SensorDataChangeEvent`
"""
def __init__(self, i2c_bus=1, i2c_address=0x29, poll_interval=3, **kwargs):
"""
:param i2c_bus: I2C bus number (default: 1)
:param i2c_address: I2C address (default: 0x29)
:param poll_interval: How often the integration should poll for new
measurements (default: 3 seconds).
"""
super().__init__(poll_interval=poll_interval, **kwargs)
self.i2c_bus = i2c_bus
self.i2c_address = i2c_address
self._device = None
self._device_lock = RLock()
self._last_data = {}
@contextmanager
def _get_device(self, ranging=1):
from VL53L1X import VL53L1X # type: ignore
with self._device_lock:
if self._device:
yield self._device
self._device = VL53L1X(i2c_bus=self.i2c_bus, i2c_address=self.i2c_address)
try:
self._device.open()
self._device.start_ranging(ranging)
yield self._device
finally:
try:
self._device.stop_ranging()
except Exception:
pass
self._device.close()
self._device = None
@action
@override
def status(self, *_, **__):
self.publish_entities(self._last_data)
return self._last_data
def _status(self, *_, short=True, medium=True, long=True, **__):
"""
:param short: Enable short range measurement (default: True)
:param medium: Enable medium range measurement (default: True)
:param long: Enable long range measurement (default: True)
:returns: dict. Example:
.. code-block:: python
output = {
"short": 83, # Short range measurement in mm
"medium": 103, # Medium range measurement in mm
"long": 200, # Long range measurement
}
"""
ret = {}
range_idx = 0
range_name = None
range_conf = {
'short': short,
'medium': medium,
'long': long,
}
for i, (key, enabled) in enumerate(range_conf.items()):
if not enabled:
continue
range_idx = i + 1
range_name = key
with self._get_device(ranging=range_idx) as device:
try:
ret[range_name] = device.get_distance()
except Exception as e:
self.logger.exception(e)
self.wait_stop(1)
return ret
@action
def get_data(self, *args, **kwargs):
"""
(Deprecated) alias for :meth:`.status`.
"""
return self.status(*args, **kwargs)
@action
def get_measurement(self, *args, **kwargs):
"""
(Deprecated) alias for :meth:`.status`.
"""
return self.status(*args, **kwargs)
@override
def transform_entities(self, entities: Mapping[str, int]) -> List[Device]:
return super().transform_entities( # type: ignore
[
Device(
id='vl53l1x',
name='VL53L1X Distance Sensor',
children=[
DistanceSensor(
id=f'vl53l1x:{key}',
name=f'{key} distance',
value=value,
unit='mm',
)
for key, value in entities.items()
],
)
]
)
@override
def publish_entities(self, entities: Mapping[str, int], *_, **__) -> List[Device]:
get_bus().post(
SensorDataChangeEvent(
data=entities,
source=get_plugin_name_by_class(self.__class__),
)
)
return super().publish_entities(entities) # type: ignore
@override
def main(self):
while not self.should_stop():
status = self._status()
if status != self._last_data:
self.publish_entities(status)
self._last_data = status
self.wait_stop(self.poll_interval)
# vim:sw=4:ts=4:et: