Compare commits

...

25 Commits

Author SHA1 Message Date
Fabio Manganiello d8c429f4a8
Major improvements on the entities engine.
- Better logic to recursively link parent/children entities, so partial
  updates won't get lost.

- Removed `EntitiesCache` - it was too much to maintain while keeping
  consistent with the ORM, and it was a perennial fight against
  SQLAlchemy's own cache.

- Removed `EntityNotifier` - with no need to merge cached entities, the
  `notify` method has become much simpler and it's simply been merged
  in the `EntitiesRepository`.
2023-02-22 02:53:45 +01:00
Fabio Manganiello 9776921836
Better way of handling with `RawSensor` in `bluetooth` integration. 2023-02-22 02:26:51 +01:00
Fabio Manganiello a5a923a752
Added `BluetoothDeviceNewDataEvent`.
These events handle the case where a Bluetooth device only publishes new
service data without advertising any additional updated properties.
2023-02-22 02:23:11 +01:00
Fabio Manganiello dc7cbe743d
Refactored/improved `RawSensor` entity.
It will now automatically deal with most of the native types and convert
them to strings on the db.
2023-02-22 02:19:19 +01:00
Fabio Manganiello b2ffc08c89
s/MultiValueSensor/CompositeSensor/g on `smartthings` 2023-02-22 02:18:12 +01:00
Fabio Manganiello 340fd08064
Removed some old `type: ignore` comments. 2023-02-22 01:29:51 +01:00
Fabio Manganiello cf219d5a48
Added some more docstrings to entities. 2023-02-22 01:02:26 +01:00
Fabio Manganiello 7fa545d7f8
Merge branch 'master' into 29-generic-entities-support 2023-02-22 00:46:33 +01:00
Fabio Manganiello c645ce6bb8
Bump version: 0.24.4 → 0.24.5 2023-02-22 00:32:57 +01:00
Fabio Manganiello 2b8a5fee88
Updated CHANGELOG 2023-02-22 00:32:39 +01:00
Fabio Manganiello 26d9aaa5b1
(Temporarily) specify `sqlalchemy<2.0.0`.
SQLAlchemy 2 has introduced several breaking changes that can break
several things in the application - especially where the code uses
`connection.execute()` with raw SQL statements.

We need to temporarily force the installation of versions from the 1.x
branch, while migrating the existing code to the new version.
2023-02-22 00:25:57 +01:00
Fabio Manganiello bbc9647cb0
s/MultiValueSensor/CompositeSensor/g 2023-02-21 23:14:10 +01:00
Fabio Manganiello 2fa45fc5a3
Documentation and LINT fixes for sensor entities. 2023-02-21 23:10:05 +01:00
Fabio Manganiello b4627ecd04 Removed deprecated use_unicode parameter from MPDClient 2023-02-20 20:35:33 +01:00
Fabio Manganiello aa0b909fff
Use the TheengsDecoder to parse Bluetooth packets and map services to native entities. 2023-02-20 20:27:17 +01:00
Fabio Manganiello 73bf2446bd
Wrap `bluetooth.connect` in a per-device locked section. 2023-02-19 23:11:19 +01:00
Fabio Manganiello 9112239ac3
Better exception management in `AsyncRunnablePlugin`.
Exceptions that cause the termination of the plugin's loop should always
be logged as such, unless the plugin is supposed to stop and various
exceptions may occur upon teardown.
2023-02-19 23:03:27 +01:00
Fabio Manganiello a6c36fa1c1
Added brand, model and model_id columns to `BluetoothDevice`. 2023-02-19 23:02:04 +01:00
Fabio Manganiello 68e6b271c1
Updated dist files 2023-02-19 22:58:20 +01:00
Fabio Manganiello cb9b01c89f
Added raw_sensor metadata 2023-02-19 22:57:50 +01:00
Fabio Manganiello 72a9a9dfcf
LINT/type fixes 2023-02-19 22:56:45 +01:00
Fabio Manganiello 8aedc3c233
Recursively normalize child entities in `EntityManager._normalize_entities` 2023-02-18 17:51:57 +01:00
Fabio Manganiello 613e32e7c1
Extended number of supported events and data fields in Bluetooth integration. 2023-02-18 01:15:10 +01:00
Fabio Manganiello 7adae272a4
Merge branch 'master' into 29-generic-entities-support 2023-02-15 22:24:41 +01:00
Fabio Manganiello 08553f84b9
Added `timeout` parameter to `websocket.send`. 2023-02-15 22:23:15 +01:00
37 changed files with 840 additions and 432 deletions

View File

@ -4,18 +4,24 @@ All notable changes to this project will be documented in this file.
Given the high speed of development in the first phase, changes are being
reported only starting from v0.20.2.
## [Unreleased]
## [0.24.5] - 2023-02-22
### Added
- Added `hid` plugin to support discoverability and data interaction with
generic HID devices - like Bluetooth/USB peripherals, joysticks, dongles and
any other type of devices that supports the HID interface.
- Added `timeout` parameter to `websocket.send` to prevent messages sent on a
non-responsive websocket from getting the websocket loop stuck
### Fixed
- Running the Zeroconf registration logic in another thread in `backend.http`,
so failures in the Zeroconf logic don't affect the startup of the web server.
- (Temporarily) introduced `sqlalchemy < 2.0.0` as a requirement - a PR with a
migration to the new stable version of SQLAlchemy is in TODO.
## [0.24.4] - 2022-12-20

View File

@ -297,6 +297,8 @@ autodoc_mock_imports = [
'aiofiles.os',
'async_lru',
'bleak',
'bluetooth_numbers',
'TheengsGateway',
]
sys.path.insert(0, os.path.abspath('../..'))

View File

@ -25,7 +25,7 @@ from .message.response import Response
from .utils import set_thread_name, get_enabled_plugins
__author__ = 'Fabio Manganiello <info@fabiomanganiello.com>'
__version__ = '0.24.4'
__version__ = '0.24.5'
log = logging.getLogger('platypush')

View File

@ -1 +1 @@
<!doctype html><html lang="en"><head><meta charset="utf-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"><meta name="viewport" content="width=device-width,initial-scale=1"><link rel="icon" href="/favicon.ico"><link rel="stylesheet" href="/fonts/poppins.css"><title>platypush</title><script defer="defer" type="module" src="/static/js/chunk-vendors.95bedba1.js"></script><script defer="defer" type="module" src="/static/js/app.aa9a5927.js"></script><link href="/static/css/chunk-vendors.0fcd36f0.css" rel="stylesheet"><link href="/static/css/app.d7cb662c.css" rel="stylesheet"><script defer="defer" src="/static/js/chunk-vendors-legacy.79dede0c.js" nomodule></script><script defer="defer" src="/static/js/app-legacy.6a27238d.js" nomodule></script></head><body><noscript><strong>We're sorry but platypush doesn't work properly without JavaScript enabled. Please enable it to continue.</strong></noscript><div id="app"></div></body></html>
<!doctype html><html lang="en"><head><meta charset="utf-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"><meta name="viewport" content="width=device-width,initial-scale=1"><link rel="icon" href="/favicon.ico"><link rel="stylesheet" href="/fonts/poppins.css"><title>platypush</title><script defer="defer" type="module" src="/static/js/chunk-vendors.95bedba1.js"></script><script defer="defer" type="module" src="/static/js/app.484f9c7c.js"></script><link href="/static/css/chunk-vendors.0fcd36f0.css" rel="stylesheet"><link href="/static/css/app.d7cb662c.css" rel="stylesheet"><script defer="defer" src="/static/js/chunk-vendors-legacy.79dede0c.js" nomodule></script><script defer="defer" src="/static/js/app-legacy.36cc00f9.js" nomodule></script></head><body><noscript><strong>We're sorry but platypush doesn't work properly without JavaScript enabled. Please enable it to continue.</strong></noscript><div id="app"></div></body></html>

View File

@ -159,7 +159,7 @@
}
},
"multi_value_sensor": {
"composite_sensor": {
"name": "Sensor",
"name_plural": "Sensors",
"icon": {
@ -191,6 +191,14 @@
}
},
"raw_sensor": {
"name": "Sensor",
"name_plural": "Sensors",
"icon": {
"class": "fas fa-thermometer"
}
},
"sensor": {
"name": "Sensor",
"name_plural": "Sensors",

View File

@ -1,10 +1,12 @@
from logging import getLogger
from threading import Thread, Event
from platypush.context import get_bus
from platypush.entities import Entity
from platypush.message.event.entities import EntityUpdateEvent
from platypush.utils import set_thread_name
from platypush.entities._engine.notifier import EntityNotifier
# pylint: disable=no-name-in-module
from platypush.entities._engine.queue import EntitiesQueue
from platypush.entities._engine.repo import EntitiesRepository
@ -18,7 +20,7 @@ class EntitiesEngine(Thread):
1. Consume entities from a queue (synchronized with the upstream
integrations that produce/handle them). The producer/consumer model
ensure that only this thread writes to the database, packs events
together (preventing eccessive writes and throttling events), and
together (preventing excessive writes and throttling events), and
prevents race conditions when SQLite is used.
2. Merge any existing entities with their newer representations.
3. Update the entities taxonomy.
@ -35,7 +37,6 @@ class EntitiesEngine(Thread):
self._should_stop = Event()
self._queue = EntitiesQueue(stop_event=self._should_stop)
self._repo = EntitiesRepository()
self._notifier = EntityNotifier(self._repo._cache)
def post(self, *entities: Entity):
self._queue.put(*entities)
@ -47,6 +48,15 @@ class EntitiesEngine(Thread):
def stop(self):
self._should_stop.set()
def notify(self, *entities: Entity):
"""
Trigger an EntityUpdateEvent if the entity has been persisted, or queue
it to the list of entities whose notifications will be flushed when the
session is committed.
"""
for entity in entities:
get_bus().post(EntityUpdateEvent(entity=entity))
def run(self):
super().run()
set_thread_name('entities')
@ -58,19 +68,15 @@ class EntitiesEngine(Thread):
if not entities or self.should_stop:
continue
# Trigger/prepare EntityUpdateEvent objects
for entity in entities:
self._notifier.notify(entity)
# Store the batch of entities
try:
entities = self._repo.save(*entities)
except Exception as e:
self.logger.error('Error while processing entity updates: ' + str(e))
self.logger.error('Error while processing entity updates: %s', e)
self.logger.exception(e)
continue
# Flush any pending notifications
self._notifier.flush(*entities)
# Trigger EntityUpdateEvent events
self.notify(*entities)
self.logger.info('Stopped entities engine')

View File

@ -1,47 +0,0 @@
from platypush.context import get_bus
from platypush.entities import Entity
from platypush.message.event.entities import EntityUpdateEvent
from platypush.entities._engine.repo.cache import EntitiesCache
class EntityNotifier:
"""
This object is in charge of forwarding EntityUpdateEvent instances on the
application bus when some entities are changed.
"""
def __init__(self, cache: EntitiesCache):
self._cache = cache
self._entities_awaiting_flush = set()
def _populate_entity_id_from_cache(self, new_entity: Entity):
cached_entity = self._cache.get(new_entity)
if cached_entity and cached_entity.id:
new_entity.id = cached_entity.id
if new_entity.id:
self._cache.update(new_entity)
def notify(self, entity: Entity):
"""
Trigger an EntityUpdateEvent if the entity has been persisted, or queue
it to the list of entities whose notifications will be flushed when the
session is committed.
"""
self._populate_entity_id_from_cache(entity)
if entity.id:
get_bus().post(EntityUpdateEvent(entity=entity))
else:
self._entities_awaiting_flush.add(entity.entity_key)
def flush(self, *entities: Entity):
"""
Flush and process any entities with pending EntityUpdateEvent
notifications.
"""
entities_awaiting_flush = {*self._entities_awaiting_flush}
for entity in entities:
key = entity.entity_key
if key in entities_awaiting_flush:
self.notify(entity)
self._entities_awaiting_flush.remove(key)

View File

@ -1,10 +1,11 @@
import logging
from typing import Dict, Iterable, Tuple
from sqlalchemy.orm import Session, make_transient
from sqlalchemy.orm import Session
from platypush.entities import Entity
from platypush.entities._engine.repo.cache import EntitiesCache
# pylint: disable=no-name-in-module
from platypush.entities._engine.repo.db import EntitiesDb
from platypush.entities._engine.repo.merger import EntitiesMerger
@ -13,74 +14,31 @@ logger = logging.getLogger('entities')
class EntitiesRepository:
"""
This object is used to get and save entities, and it wraps database and
cache objects.
This object is used to get and save entities. It wraps the database
connection.
"""
def __init__(self):
self._cache = EntitiesCache()
self._db = EntitiesDb()
self._merger = EntitiesMerger(self)
self._init_entities_cache()
def _init_entities_cache(self):
"""
Initializes the repository with the existing entities.
"""
logger.info('Initializing entities cache')
with self._db.get_session() as session:
entities = session.query(Entity).all()
for entity in entities:
make_transient(entity)
self._cache.update(*entities, overwrite=True)
logger.info('Entities cache initialized')
def get(
self, session: Session, entities: Iterable[Entity], use_cache=True
self, session: Session, entities: Iterable[Entity]
) -> Dict[Tuple[str, str], Entity]:
"""
Given a set of entity objects, it returns those that already exist
(or have the same ``entity_key``). It looks up both the cache and the
database.
(or have the same ``entity_key``).
"""
existing_entities = {}
if not use_cache:
existing_entities = self._db.fetch(session, entities)
self._cache.update(*existing_entities.values())
else:
# Fetch the entities that exist in the cache
existing_entities = {
e.entity_key: self._cache.get(e) for e in entities if self._cache.get(e)
}
# Retrieve from the database the entities that miss from the cache
cache_miss_entities = {
e.entity_key: e
for e in entities
if e.entity_key not in existing_entities
}
cache_miss_existing_entities = self._db.fetch(
session, cache_miss_entities.values()
)
# Update the cache
self._cache.update(*cache_miss_existing_entities.values())
# Return the union of the cached + retrieved entities
existing_entities.update(cache_miss_existing_entities)
return existing_entities
return self._db.fetch(session, entities)
def save(self, *entities: Entity) -> Iterable[Entity]:
"""
Perform an upsert of entities after merging duplicates and rebuilding
the taxonomies. It updates both the database and the cache.
the taxonomies.
"""
with self._db.get_session(locked=True, autoflush=False) as session:
merged_entities = self._merger.merge(session, entities)
merged_entities = self._db.upsert(session, merged_entities)
self._cache.update(*merged_entities, overwrite=True)
return merged_entities

View File

@ -1,51 +0,0 @@
from threading import RLock
from typing import Dict, Optional, Tuple
from platypush.entities import Entity
class EntitiesCache:
"""
An auxiliary class to model an entities lookup cache with multiple keys.
"""
def __init__(self):
self.by_id: Dict[str, Entity] = {}
self.by_external_id_and_plugin: Dict[Tuple[str, str], Entity] = {}
self._lock = RLock()
def get(self, entity: Entity) -> Optional[Entity]:
"""
Retrieve the cached representation of an entity, if it exists.
"""
if entity.id:
e = self.by_id.get(str(entity.id))
if e:
return e
if entity.external_id and entity.plugin:
e = self.by_external_id_and_plugin.get(
(str(entity.external_id), str(entity.plugin))
)
if e:
return e
def update(self, *entities: Entity, overwrite=False):
"""
Update the cache with a list of new entities.
"""
with self._lock:
for entity in entities:
if not overwrite:
existing_entity = self.by_id.get(str(entity.id))
if existing_entity:
for k, v in existing_entity.to_json().items():
if getattr(entity, k, None) is None:
setattr(entity, k, v)
if entity.id:
self.by_id[str(entity.id)] = entity
if entity.external_id and entity.plugin:
self.by_external_id_and_plugin[
(str(entity.external_id), str(entity.plugin))
] = entity

View File

@ -82,9 +82,15 @@ class EntitiesDb:
rewired. Otherwise, we may end up with conflicts on entities that have
already been flushed.
"""
# Index childrens by parent_id and by parent_key
children_by_parent_id = defaultdict(lambda: defaultdict(Entity))
children_by_parent_key = defaultdict(lambda: defaultdict(Entity))
# Index children by parent_id and by parent_key
children_by_parent_id: Dict[int, Dict[Tuple[str, str], Entity]] = defaultdict(
lambda: defaultdict(Entity)
)
children_by_parent_key: Dict[
Tuple[str, str], Dict[Tuple[str, str], Entity]
] = defaultdict(lambda: defaultdict(Entity))
for entity in entities:
parent_key = None
parent_id = entity.parent_id
@ -113,8 +119,8 @@ class EntitiesDb:
_TaxonomyAwareEntity(entity=e, level=0) for e in root_entities
]
batches = []
current_batch = []
batches: List[List[Entity]] = []
current_batch: List[_TaxonomyAwareEntity] = []
while entities_to_process:
# Pop the first element in the list (FIFO implementation)

View File

@ -12,7 +12,7 @@ class EntitiesMerger:
"""
def __init__(self, repository):
from platypush.entities._engine.repo import EntitiesRepository
from . import EntitiesRepository
self._repo: EntitiesRepository = repository
@ -26,8 +26,8 @@ class EntitiesMerger:
the parent/child relationships and return a tuple with
``[new_entities, updated_entities]``.
"""
new_entities = {}
existing_entities = {}
new_entities: Dict[Tuple[str, str], Entity] = {}
existing_entities: Dict[Tuple[str, str], Entity] = {}
self._merge(
session,
@ -49,7 +49,7 @@ class EntitiesMerger:
(Recursive) inner implementation of the entity merge logic.
"""
processed_entities = []
existing_entities.update(self._repo.get(session, entities, use_cache=False))
existing_entities.update(self._repo.get(session, entities))
# Make sure that we have no duplicate entity keys in the current batch
entities = list(
@ -75,10 +75,13 @@ class EntitiesMerger:
if not parent_id and parent:
existing_entity.parent = parent
else:
existing_entity.parent_id = parent_id # type: ignore
existing_entity.parent_id = parent_id
# Merge the other columns
self._merge_columns(entity, existing_entity)
# Merge the children
self._merge(session, entity.children, new_entities, existing_entities)
# Use the updated version of the existing entity.
entity = existing_entity
else:
# Add it to the map of new entities if the entity doesn't exist
@ -99,7 +102,7 @@ class EntitiesMerger:
Recursively update the hierarchy of an entity, moving upwards towards
the parent.
"""
parent_id: Optional[int] = entity.parent_id # type: ignore
parent_id: Optional[int] = entity.parent_id
parent: Optional[Entity] = entity.parent
# If the entity has a parent with an ID, use that
@ -109,12 +112,12 @@ class EntitiesMerger:
# If there's no parent_id but there is a parent object, try to fetch
# its stored version
if not parent_id and parent:
batch = list(self._repo.get(session, [parent], use_cache=False).values())
batch = list(self._repo.get(session, [parent]).values())
# If the parent is already stored, use its ID
if batch:
parent = batch[0]
parent_id = parent.id # type: ignore
parent_id = parent.id
# Otherwise, check if its key is already among those awaiting flush
# and reuse the same objects (prevents SQLAlchemy from generating
@ -135,7 +138,7 @@ class EntitiesMerger:
# flushing)
if parent_id:
entity.parent = None
entity.parent_id = parent_id # type: ignore
entity.parent_id = parent_id
return parent_id, parent
@ -175,9 +178,9 @@ class EntitiesMerger:
columns = [col.key for col in entity.columns]
for col in columns:
if col == 'meta':
existing_entity.meta = { # type: ignore
**(existing_entity.meta or {}), # type: ignore
**(entity.meta or {}), # type: ignore
existing_entity.meta = {
**(existing_entity.meta or {}),
**(entity.meta or {}),
}
elif col not in ('id', 'created_at'):
setattr(existing_entity, col, getattr(entity, col))

View File

@ -75,6 +75,7 @@ class EntityManager(ABC):
entity.plugin = get_plugin_name_by_class(self.__class__) # type: ignore
entity.updated_at = datetime.utcnow() # type: ignore
entity.children = self._normalize_entities(entity.children)
return entities

View File

@ -1,4 +1,11 @@
from sqlalchemy import Column, Integer, Boolean, ForeignKey
from sqlalchemy import (
Boolean,
Column,
ForeignKey,
Integer,
JSON,
String,
)
from platypush.common.db import Base
@ -17,7 +24,55 @@ if 'bluetooth_device' not in Base.metadata:
id = Column(
Integer, ForeignKey(Device.id, ondelete='CASCADE'), primary_key=True
)
connected = Column(Boolean, default=False)
""" Whether the device is connected. """
paired = Column(Boolean, default=False)
""" Whether the device is paired. """
trusted = Column(Boolean, default=False)
""" Whether the device is trusted. """
blocked = Column(Boolean, default=False)
""" Whether the device is blocked. """
rssi = Column(Integer, default=None)
""" Received Signal Strength Indicator. """
tx_power = Column(Integer, default=None)
""" Reported transmission power. """
manufacturers = Column(JSON)
""" Registered manufacturers for the device, as an ID -> Name map. """
uuids = Column(JSON)
"""
Service/characteristic UUIDs exposed by the device, as a
UUID -> Name map.
"""
brand = Column(String)
""" Device brand, as a string. """
model = Column(String)
""" Device model, as a string. """
model_id = Column(String)
""" Device model ID. """
manufacturer_data = Column(JSON)
"""
Latest manufacturer data published by the device, as a
``manufacturer_id -> data`` map, where ``data`` is a hexadecimal
string.
"""
service_data = Column(JSON)
"""
Latest service data published by the device, as a ``service_uuid ->
data`` map, where ``data`` is a hexadecimal string.
"""
__mapper_args__ = {
'polymorphic_identity': __tablename__,

View File

@ -8,6 +8,10 @@ from ._base import Entity
if 'device' not in Base.metadata:
class Device(Entity):
"""
Base class to model device entities.
"""
__tablename__ = 'device'
id = Column(

View File

@ -8,6 +8,11 @@ from .devices import Device
if 'dimmer' not in Base.metadata:
class Dimmer(Device):
"""
This class models dimmer entities. A dimmer is any actionable entity
with numeric values and an optional min/max range.
"""
__tablename__ = 'dimmer'
id = Column(

View File

@ -1,4 +1,6 @@
import json
import logging
from typing import Optional, Union
from sqlalchemy import (
Boolean,
@ -18,21 +20,65 @@ logger = logging.getLogger(__name__)
class Sensor(Device):
"""
Abstract class for sensor entities. A sensor entity is, by definition, an
entity with the ``is_read_only`` property set to ``True``.
"""
__abstract__ = True
def __init__(self, *args, is_read_only=True, **kwargs):
super().__init__(*args, is_read_only=is_read_only, **kwargs)
def __init__(self, *args, **kwargs):
kwargs['is_read_only'] = True
super().__init__(*args, **kwargs)
if 'raw_sensor' not in Base.metadata:
class RawSensor(Sensor):
"""
Models a raw sensor, whose value can contain either a string, a
hex-encoded binary string, or a JSON-encoded object.
"""
__tablename__ = 'raw_sensor'
id = Column(
Integer, ForeignKey(Device.id, ondelete='CASCADE'), primary_key=True
)
value = Column(String)
_value = Column(String)
is_binary = Column(Boolean, default=False)
""" If ``is_binary`` is ``True``, then ``value`` is a hex string. """
is_json = Column(Boolean, default=False)
"""
If ``is_json`` is ``True``, then ``value`` is a JSON-encoded string
object or array.
"""
@property
def value(self):
if self.is_binary:
return self._value.decode()
if self.is_json:
return json.loads(self._value)
return self._value
@value.setter
def value(
self, value: Optional[Union[str, bytearray, bytes, list, tuple, set, dict]]
):
if isinstance(value, (bytearray, bytes)):
self._value = '0x' + ''.join([f'{x:02x}' for x in value])
self.is_binary = True
elif isinstance(value, (list, tuple, set)):
self._value = json.dumps(list(value))
self.is_json = True
elif isinstance(value, dict):
self._value = json.dumps(value)
self.is_json = True
else:
self._value = value
self.is_binary = False
self.is_json = False
__mapper_args__ = {
'polymorphic_identity': __tablename__,
@ -42,6 +88,11 @@ if 'raw_sensor' not in Base.metadata:
if 'numeric_sensor' not in Base.metadata:
class NumericSensor(Sensor):
"""
Models a numeric sensor, with a numeric value and an optional min/max
range.
"""
__tablename__ = 'numeric_sensor'
id = Column(
@ -60,18 +111,22 @@ if 'numeric_sensor' not in Base.metadata:
if 'binary_sensor' not in Base.metadata:
class BinarySensor(Sensor):
"""
Models a binary sensor, with a binary boolean value.
"""
__tablename__ = 'binary_sensor'
def __init__(self, *args, value=None, **kwargs):
if isinstance(value, str):
value = value.lower()
if value in {True, 1, '1', 't', 'true', 'on', 'ON'}:
if str(value).lower() in {'1', 't', 'true', 'on'}:
value = True
elif value in {False, 0, '0', 'f', 'false', 'off', 'OFF'}:
elif str(value).lower() in {'0', 'f', 'false', 'off'}:
value = False
elif value is not None:
logger.warning(f'Unsupported value for BinarySensor type: {value}')
logger.warning('Unsupported value for BinarySensor type: %s', value)
value = None
super().__init__(*args, value=value, **kwargs)
@ -89,6 +144,10 @@ if 'binary_sensor' not in Base.metadata:
if 'enum_sensor' not in Base.metadata:
class EnumSensor(Sensor):
"""
Models an enum sensor, whose value belongs to a set of pre-defined values.
"""
__tablename__ = 'enum_sensor'
id = Column(
@ -96,16 +155,22 @@ if 'enum_sensor' not in Base.metadata:
)
value = Column(String)
values = Column(JSON)
""" Possible values for the sensor, as a JSON array. """
__mapper_args__ = {
'polymorphic_identity': __tablename__,
}
if 'multi_value_sensor' not in Base.metadata:
if 'composite_sensor' not in Base.metadata:
class MultiValueSensor(Sensor):
__tablename__ = 'multi_value_sensor'
class CompositeSensor(Sensor):
"""
A composite sensor is a sensor whose value can be mapped to a JSON
object (either a dictionary or an array)
"""
__tablename__ = 'composite_sensor'
id = Column(
Integer, ForeignKey(Device.id, ondelete='CASCADE'), primary_key=True

View File

@ -1,4 +1,4 @@
from typing import Optional
from typing import Dict, Optional
from platypush.message.event import Event
@ -8,44 +8,169 @@ class BluetoothEvent(Event):
Base class for Bluetooth events.
"""
def __init__(self, address: str, *args, name: Optional[str] = None, **kwargs):
super().__init__(*args, address=address, name=name, **kwargs)
class BluetoothWithPortEvent(BluetoothEvent):
class BluetoothScanPausedEvent(BluetoothEvent):
"""
Base class for Bluetooth events that include a communication port.
Event triggered when the Bluetooth scan is paused.
"""
def __init__(self, *args, duration: Optional[float] = None, **kwargs):
super().__init__(*args, duration=duration, **kwargs)
class BluetoothScanResumedEvent(BluetoothEvent):
"""
Event triggered when the Bluetooth scan is resumed.
"""
def __init__(self, *args, duration: Optional[float] = None, **kwargs):
super().__init__(*args, duration=duration, **kwargs)
class BluetoothWithPortEvent(Event):
"""
Base class for Bluetooth events with an associated port.
"""
def __init__(self, *args, port: Optional[str] = None, **kwargs):
"""
:param port: The communication port of the device.
"""
super().__init__(*args, port=port, **kwargs)
class BluetoothDeviceFoundEvent(BluetoothEvent):
class BluetoothDeviceEvent(BluetoothWithPortEvent):
"""
Event triggered when a Bluetooth device is found during a scan.
Base class for Bluetooth device events.
"""
def __init__(
self,
*args,
address: str,
connected: bool,
paired: bool,
trusted: bool,
blocked: bool,
name: Optional[str] = None,
uuids: Optional[Dict[str, str]] = None,
rssi: Optional[int] = None,
tx_power: Optional[int] = None,
manufacturers: Optional[Dict[int, str]] = None,
manufacturer_data: Optional[Dict[int, str]] = None,
service_data: Optional[Dict[str, str]] = None,
**kwargs
):
"""
:param address: The Bluetooth address of the device.
:param connected: Whether the device is connected.
:param paired: Whether the device is paired.
:param trusted: Whether the device is trusted.
:param blocked: Whether the device is blocked.
:param name: The name of the device.
:param uuids: The UUIDs of the services exposed by the device.
:param rssi: Received Signal Strength Indicator.
:param tx_power: Transmission power.
:param manufacturers: The manufacturers published by the device, as a
``manufacturer_id -> registered_name`` map.
:param manufacturer_data: The manufacturer data published by the
device, as a ``manufacturer_id -> data`` map, where ``data`` is a
hexadecimal string.
:param service_data: The service data published by the device, as a
``service_uuid -> data`` map, where ``data`` is a hexadecimal string.
"""
super().__init__(
*args,
address=address,
name=name,
connected=connected,
paired=paired,
blocked=blocked,
trusted=trusted,
uuids=uuids or {},
rssi=rssi,
tx_power=tx_power,
manufacturers=manufacturers or {},
manufacturer_data=manufacturer_data or {},
service_data=service_data or {},
**kwargs
)
class BluetoothDeviceNewDataEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device publishes new manufacturer/service
data.
"""
class BluetoothDeviceLostEvent(BluetoothEvent):
class BluetoothDeviceFoundEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device previously scanned is lost.
Event triggered when a Bluetooth device is discovered during a scan.
"""
class BluetoothDeviceConnectedEvent(BluetoothWithPortEvent):
class BluetoothDeviceLostEvent(BluetoothDeviceEvent):
"""
Event triggered when a previously discovered Bluetooth device is lost.
"""
class BluetoothDeviceConnectedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is connected.
"""
class BluetoothDeviceDisconnectedEvent(BluetoothWithPortEvent):
class BluetoothDeviceDisconnectedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is disconnected.
"""
class BluetoothConnectionRejectedEvent(BluetoothWithPortEvent):
class BluetoothDevicePairedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is paired.
"""
class BluetoothDeviceUnpairedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is unpaired.
"""
class BluetoothDeviceBlockedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is blocked.
"""
class BluetoothDeviceUnblockedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is unblocked.
"""
class BluetoothDeviceTrustedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is trusted.
"""
class BluetoothDeviceSignalUpdateEvent(BluetoothDeviceEvent):
"""
Event triggered when the RSSI/TX power of a Bluetooth device is updated.
"""
class BluetoothDeviceUntrustedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is untrusted.
"""
class BluetoothConnectionRejectedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth connection is rejected.
"""

View File

@ -1,130 +0,0 @@
from typing import Collection, Optional
from platypush.message.event import Event
class BluetoothEvent(Event):
"""
Base class for Bluetooth events.
"""
class BluetoothScanPausedEvent(BluetoothEvent):
"""
Event triggered when the Bluetooth scan is paused.
"""
def __init__(self, *args, duration: Optional[float] = None, **kwargs):
super().__init__(*args, duration=duration, **kwargs)
class BluetoothScanResumedEvent(BluetoothEvent):
"""
Event triggered when the Bluetooth scan is resumed.
"""
def __init__(self, *args, duration: Optional[float] = None, **kwargs):
super().__init__(*args, duration=duration, **kwargs)
class BluetoothDeviceEvent(BluetoothEvent):
"""
Base class for Bluetooth device events.
"""
def __init__(
self,
*args,
address: str,
connected: bool,
paired: bool,
trusted: bool,
blocked: bool,
name: Optional[str] = None,
characteristics: Optional[Collection[str]] = None,
**kwargs
):
"""
:param address: The Bluetooth address of the device.
:param connected: Whether the device is connected.
:param paired: Whether the device is paired.
:param trusted: Whether the device is trusted.
:param blocked: Whether the device is blocked.
:param name: The name of the device.
:param characteristics: The UUIDs of the characteristics exposed by the
device.
"""
super().__init__(
*args,
address=address,
name=name,
connected=connected,
paired=paired,
blocked=blocked,
trusted=trusted,
characteristics=characteristics or [],
**kwargs
)
class BluetoothDeviceFoundEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is discovered during a scan.
"""
class BluetoothDeviceLostEvent(BluetoothDeviceEvent):
"""
Event triggered when a previously discovered Bluetooth device is lost.
"""
class BluetoothDeviceConnectedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is connected.
"""
class BluetoothDeviceDisconnectedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is disconnected.
"""
class BluetoothDevicePairedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is paired.
"""
class BluetoothDeviceUnpairedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is unpaired.
"""
class BluetoothDeviceBlockedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is blocked.
"""
class BluetoothDeviceUnblockedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is unblocked.
"""
class BluetoothDeviceTrustedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is trusted.
"""
class BluetoothDeviceUntrustedEvent(BluetoothDeviceEvent):
"""
Event triggered when a Bluetooth device is untrusted.
"""
# vim:sw=4:ts=4:et:

View File

@ -212,7 +212,9 @@ class AsyncRunnablePlugin(RunnablePlugin, ABC):
try:
self._loop.run_until_complete(self._task)
except Exception as e:
self.logger.info('The loop has terminated with an error: %s', e)
if not self.should_stop():
self.logger.warning('The loop has terminated with an error')
self.logger.exception(e)
self._task.cancel()

View File

@ -1,24 +1,38 @@
import base64
from asyncio import Event, ensure_future
from asyncio import Event, Lock, ensure_future
from contextlib import asynccontextmanager
from threading import RLock, Timer
from typing import AsyncGenerator, Collection, List, Optional, Dict, Type, Union
from time import time
from typing import (
Any,
AsyncGenerator,
Collection,
Final,
List,
Optional,
Dict,
Type,
Union,
)
from uuid import UUID
from bleak import BleakClient, BleakScanner
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from typing_extensions import override
from platypush.context import get_bus, get_or_create_event_loop
from platypush.entities import Entity, EntityManager
from platypush.entities.bluetooth import BluetoothDevice
from platypush.message.event.bluetooth.ble import (
from platypush.message.event.bluetooth import (
BluetoothDeviceBlockedEvent,
BluetoothDeviceConnectedEvent,
BluetoothDeviceDisconnectedEvent,
BluetoothDeviceFoundEvent,
BluetoothDeviceLostEvent,
BluetoothDeviceNewDataEvent,
BluetoothDevicePairedEvent,
BluetoothDeviceSignalUpdateEvent,
BluetoothDeviceTrustedEvent,
BluetoothDeviceUnblockedEvent,
BluetoothDeviceUnpairedEvent,
@ -29,6 +43,8 @@ from platypush.message.event.bluetooth.ble import (
)
from platypush.plugins import AsyncRunnablePlugin, action
from ._mappers import device_to_entity, parse_device_args
UUIDType = Union[str, UUID]
@ -36,26 +52,55 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
"""
Plugin to interact with BLE (Bluetooth Low-Energy) devices.
This plugin uses `_Bleak_ <https://github.com/hbldh/bleak>`_ to interact
with the Bluetooth stack and `_Theengs_ <https://github.com/theengs/decoder>`_
to map the services exposed by the devices into native entities.
The full list of devices natively supported can be found
`here <https://decoder.theengs.io/devices/devices_by_brand.html>`_.
Note that the support for Bluetooth low-energy devices requires a Bluetooth
adapter compatible with the Bluetooth 5.0 specification or higher.
Requires:
* **bleak** (``pip install bleak``)
* **bluetooth-numbers** (``pip install bluetooth-numbers``)
* **TheengsGateway** (``pip install git+https://github.com/BlackLight/TheengsGateway``)
TODO: Write supported events.
Triggers:
* :class:`platypush.message.event.bluetooth.BluetoothDeviceBlockedEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDeviceConnectedEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDeviceDisconnectedEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDeviceFoundEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDeviceLostEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDeviceNewDataEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDevicePairedEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDeviceTrustedEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDeviceUnblockedEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDeviceUnpairedEvent`
* :class:`platypush.message.event.bluetooth.BluetoothDeviceUntrustedEvent`
* :class:`platypush.message.event.bluetooth.BluetoothScanPausedEvent`
* :class:`platypush.message.event.bluetooth.BluetoothScanResumedEvent`
"""
# Default connection timeout (in seconds)
_default_connect_timeout = 5
_default_connect_timeout: Final[int] = 5
""" Default connection timeout (in seconds) """
_rssi_update_interval: Final[int] = 30
"""
How long we should wait before triggering an update event upon a new
RSSI update, in seconds.
"""
def __init__(
self,
interface: Optional[str] = None,
connect_timeout: float = _default_connect_timeout,
device_names: Optional[Dict[str, str]] = None,
characteristics: Optional[Collection[UUIDType]] = None,
uuids: Optional[Collection[UUIDType]] = None,
scan_paused_on_start: bool = False,
**kwargs,
):
@ -64,8 +109,8 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
on Linux). Default: first available interface.
:param connect_timeout: Timeout in seconds for the connection to a
Bluetooth device. Default: 5 seconds.
:param characteristics: List of service/characteristic UUIDs to
discover. Default: all.
:param uuids: List of service/characteristic UUIDs to discover.
Default: all.
:param device_names: Bluetooth address -> device name mapping. If not
specified, the device's advertised name will be used, or its
Bluetooth address. Example:
@ -85,14 +130,17 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
"""
super().__init__(**kwargs)
self._interface = interface
self._connect_timeout = connect_timeout
self._characteristics = characteristics
self._interface: Optional[str] = interface
self._connect_timeout: float = connect_timeout
self._uuids: Collection[Union[str, UUID]] = uuids or []
self._scan_lock = RLock()
self._scan_enabled = Event()
self._scan_controller_timer: Optional[Timer] = None
self._connections: Dict[str, BleakClient] = {}
self._connection_locks: Dict[str, Lock] = {}
self._devices: Dict[str, BLEDevice] = {}
self._entities: Dict[str, BluetoothDevice] = {}
self._device_last_updated_at: Dict[str, float] = {}
self._device_name_by_addr = device_names or {}
self._device_addr_by_name = {
name: addr for addr, name in self._device_name_by_addr.items()
@ -119,65 +167,80 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
assert dev is not None, f'Unknown device: "{device}"'
return dev
def _get_device_name(self, device: BLEDevice) -> str:
return (
self._device_name_by_addr.get(device.address)
or device.name
or device.address
)
def _post_event(
self, event_type: Type[BluetoothDeviceEvent], device: BLEDevice, **kwargs
):
props = device.details.get('props', {})
get_bus().post(
event_type(
address=device.address,
name=self._get_device_name(device),
connected=props.get('Connected', False),
paired=props.get('Paired', False),
blocked=props.get('Blocked', False),
trusted=props.get('Trusted', False),
characteristics=device.metadata.get('uuids', []),
**kwargs,
)
event_type(address=device.address, **parse_device_args(device), **kwargs)
)
def _on_device_event(self, device: BLEDevice, _):
def _on_device_event(self, device: BLEDevice, data: AdvertisementData):
"""
Device advertisement packet callback handler.
1. It generates the relevant
:class:`platypush.message.event.bluetooth.BluetoothDeviceEvent` if the
state of the device has changed.
2. It builds the relevant
:class:`platypush.entity.bluetooth.BluetoothDevice` entity object
populated with children entities that contain the supported
properties.
:param device: The Bluetooth device.
:param data: The advertisement data.
"""
event_types: List[Type[BluetoothDeviceEvent]] = []
entity = device_to_entity(device, data)
existing_entity = self._entities.get(device.address)
existing_device = self._devices.get(device.address)
if existing_device:
old_props = existing_device.details.get('props', {})
new_props = device.details.get('props', {})
if old_props.get('Paired') != new_props.get('Paired'):
if existing_entity and existing_device:
if existing_entity.paired != entity.paired:
event_types.append(
BluetoothDevicePairedEvent
if new_props.get('Paired')
if entity.paired
else BluetoothDeviceUnpairedEvent
)
if old_props.get('Connected') != new_props.get('Connected'):
if existing_entity.connected != entity.connected:
event_types.append(
BluetoothDeviceConnectedEvent
if new_props.get('Connected')
if entity.connected
else BluetoothDeviceDisconnectedEvent
)
if old_props.get('Blocked') != new_props.get('Blocked'):
if existing_entity.blocked != entity.blocked:
event_types.append(
BluetoothDeviceBlockedEvent
if new_props.get('Blocked')
if entity.blocked
else BluetoothDeviceUnblockedEvent
)
if old_props.get('Trusted') != new_props.get('Trusted'):
if existing_entity.trusted != entity.trusted:
event_types.append(
BluetoothDeviceTrustedEvent
if new_props.get('Trusted')
if entity.trusted
else BluetoothDeviceUntrustedEvent
)
if (
time() - self._device_last_updated_at.get(device.address, 0)
) >= self._rssi_update_interval and (
existing_entity.rssi != device.rssi
or existing_entity.tx_power != entity.tx_power
):
event_types.append(BluetoothDeviceSignalUpdateEvent)
if (
existing_device.metadata.get('manufacturer_data', {})
!= device.metadata.get('manufacturer_data', {})
) or (
existing_device.details.get('props', {}).get('ServiceData', {})
!= device.details.get('props', {}).get('ServiceData', {})
):
event_types.append(BluetoothDeviceNewDataEvent)
else:
event_types.append(BluetoothDeviceFoundEvent)
@ -189,7 +252,31 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
if event_types:
for event_type in event_types:
self._post_event(event_type, device)
self.publish_entities([device])
self._device_last_updated_at[device.address] = time()
for child in entity.children:
child.parent = entity
self.publish_entities([entity])
def _has_changed(self, entity: BluetoothDevice) -> bool:
existing_entity = self._entities.get(entity.id or entity.external_id)
# If the entity didn't exist before, it's a new device.
if not existing_entity:
return True
entity_dict = entity.to_json()
existing_entity_dict = entity.to_json()
# Check if any of the root attributes changed, excluding those that are
# managed by the entities engine).
return any(
attr
for attr, value in entity_dict.items()
if value != existing_entity_dict.get(attr)
and attr not in {'id', 'external_id', 'plugin', 'updated_at'}
)
@asynccontextmanager
async def _connect(
@ -199,14 +286,18 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
timeout: Optional[float] = None,
) -> AsyncGenerator[BleakClient, None]:
dev = await self._get_device(device)
async with BleakClient(
dev.address,
adapter=interface or self._interface,
timeout=timeout or self._connect_timeout,
) as client:
self._connections[dev.address] = client
yield client
self._connections.pop(dev.address)
async with self._connection_locks.get(dev.address, Lock()) as lock:
self._connection_locks[dev.address] = lock or Lock()
async with BleakClient(
dev.address,
adapter=interface or self._interface,
timeout=timeout or self._connect_timeout,
) as client:
self._connections[dev.address] = client
yield client
self._connections.pop(dev.address, None)
async def _read(
self,
@ -234,28 +325,26 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
async def _scan(
self,
duration: Optional[float] = None,
characteristics: Optional[Collection[UUIDType]] = None,
publish_entities: bool = False,
uuids: Optional[Collection[UUIDType]] = None,
) -> Collection[Entity]:
with self._scan_lock:
timeout = duration or self.poll_interval or 5
devices = await BleakScanner.discover(
adapter=self._interface,
timeout=timeout,
service_uuids=list(
map(str, characteristics or self._characteristics or [])
),
service_uuids=list(map(str, uuids or self._uuids or [])),
detection_callback=self._on_device_event,
)
# TODO Infer type from device.metadata['manufacturer_data']
self._devices.update({dev.address: dev for dev in devices})
return (
self.publish_entities(devices)
if publish_entities
else self.transform_entities(devices)
)
addresses = {dev.address.lower() for dev in devices}
return [
dev
for addr, dev in self._entities.items()
if isinstance(dev, BluetoothDevice)
and addr.lower() in addresses
and dev.reachable
]
async def _scan_state_set(self, state: bool, duration: Optional[float] = None):
def timer_callback():
@ -307,7 +396,7 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
def scan(
self,
duration: Optional[float] = None,
characteristics: Optional[Collection[UUIDType]] = None,
uuids: Optional[Collection[UUIDType]] = None,
):
"""
Scan for Bluetooth devices nearby and return the results as a list of
@ -315,12 +404,10 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
:param duration: Scan duration in seconds (default: same as the plugin's
`poll_interval` configuration parameter)
:param characteristics: List of characteristic UUIDs to discover. Default: all.
:param uuids: List of characteristic UUIDs to discover. Default: all.
"""
loop = get_or_create_event_loop()
return loop.run_until_complete(
self._scan(duration, characteristics, publish_entities=True)
)
return loop.run_until_complete(self._scan(duration, uuids))
@action
def read(
@ -381,15 +468,25 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
"""
return self.scan().output
@override
def publish_entities(
self, entities: Optional[Collection[Any]]
) -> Collection[Entity]:
self._entities.update({entity.id: entity for entity in (entities or [])})
return super().publish_entities(entities)
@override
def transform_entities(
self, entities: Collection[BLEDevice]
self, entities: Collection[Union[BLEDevice, BluetoothDevice]]
) -> Collection[BluetoothDevice]:
return [
BluetoothDevice(
id=dev.address,
name=self._get_device_name(dev),
**parse_device_args(dev),
)
if isinstance(dev, BLEDevice)
else dev
for dev in entities
]
@ -401,7 +498,7 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
await self._scan_enabled.wait()
entities = await self._scan()
new_device_addresses = {e.id for e in entities}
new_device_addresses = {e.external_id for e in entities}
missing_device_addresses = device_addresses - new_device_addresses
missing_devices = [
dev
@ -412,6 +509,7 @@ class BluetoothBlePlugin(AsyncRunnablePlugin, EntityManager):
for dev in missing_devices:
self._post_event(BluetoothDeviceLostEvent, dev)
self._devices.pop(dev.address, None)
self._entities.pop(dev.address, None)
device_addresses = new_device_addresses

View File

@ -0,0 +1,268 @@
import json
import struct
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, Optional
from bleak.backends.device import BLEDevice
from bleak.backends.scanner import AdvertisementData
from bleak.uuids import uuidstr_to_str
from bluetooth_numbers import company
# pylint: disable=no-name-in-module
from TheengsGateway._decoder import decodeBLE, getAttribute, getProperties
from platypush.entities import Entity
from platypush.entities.batteries import Battery
from platypush.entities.bluetooth import BluetoothDevice
from platypush.entities.electricity import (
CurrentSensor,
EnergySensor,
PowerSensor,
VoltageSensor,
)
from platypush.entities.humidity import HumiditySensor
from platypush.entities.illuminance import IlluminanceSensor
from platypush.entities.motion import MotionSensor
from platypush.entities.sensors import BinarySensor, NumericSensor, RawSensor
from platypush.entities.temperature import TemperatureSensor
@dataclass
class TheengsEntity:
"""
Utility class to store the data parsed from the Theengs library.
"""
data: dict = field(default_factory=dict)
properties: dict = field(default_factory=dict)
brand: Optional[str] = None
model: Optional[str] = None
model_id: Optional[str] = None
# pylint: disable=too-few-public-methods
class NullSensor:
"""
Dummy class to model sensors with null values (hence without sufficient
information for the application to infer the type).
"""
def __init__(self, *_, **__):
pass
# Maps property names to transformer methods (first mapper choice).
_property_to_entity: Dict[str, Callable[[Any, Dict[str, Any]], Entity]] = {
'battery': lambda value, conf: Battery(
value=value,
unit=conf.get('unit', '%'),
min=conf.get('min', 0),
max=conf.get('min', 100),
),
'current': lambda value, conf: CurrentSensor(
value=value,
unit=conf.get('unit', 'A'),
),
'energy': lambda value, conf: EnergySensor(
value=value,
unit=conf.get('unit', 'kWh'),
),
'humidity': lambda value, conf: HumiditySensor(
value=value,
unit=conf.get('unit', '%'),
min=conf.get('min', 0),
max=conf.get('min', 100),
),
'light level': lambda value, _: IlluminanceSensor(value=value),
'power': lambda value, conf: PowerSensor(
value=value,
unit=conf.get('unit', 'W'),
),
'motion': lambda value, _: MotionSensor(value=value),
'temperature': lambda value, conf: TemperatureSensor(
value=value,
unit=conf.get('unit', 'C'),
),
'voltage': lambda value, conf: VoltageSensor(
value=value,
unit=conf.get('unit', 'V'),
),
}
# Maps reported units to transformer methods (second mapper choice).
_unit_to_entity: Dict[str, Callable[[Any, Dict[str, Any]], Entity]] = {
'status': lambda value, _: BinarySensor(value=value),
'int': lambda value, _: NumericSensor(value=value),
'%': lambda value, conf: NumericSensor(
value=value,
unit='%',
min=conf.get('min', 0),
max=conf.get('min', 100),
),
}
# Maps value types to transformer methods (third mapper choice).
_value_type_to_entity: Dict[type, Callable[[Any, Dict[str, Any]], Entity]] = {
bool: lambda value, _: BinarySensor(value=value),
int: lambda value, _: NumericSensor(value=value),
float: lambda value, _: NumericSensor(value=value),
str: lambda value, _: RawSensor(value=value),
bytes: lambda value, _: RawSensor(value=value),
bytearray: lambda value, _: RawSensor(value=value),
}
def device_to_entity(device: BLEDevice, data: AdvertisementData) -> BluetoothDevice:
"""
Convert the data received from a Bluetooth advertisement packet into a
compatible Platypush :class:`platypush.entity.bluetooth.BluetoothDevice`
entity, with the discovered services and characteristics exposed as children
entities.
"""
theengs_entity = _parse_advertisement_data(data)
parent_entity = BluetoothDevice(
id=device.address,
model=theengs_entity.model,
brand=theengs_entity.brand,
reachable=True,
**parse_device_args(device),
)
parsed_entities = {
# Check if we can infer an entity mapper from the property name.
conf.get('name', name): _property_to_entity.get(
conf.get('name'),
# If not, check if we can infer an entity mapper from the reported unit.
_unit_to_entity.get(
conf.get('unit'),
# If not, check if we can infer an entity mapper from the value type.
_value_type_to_entity.get(
type(theengs_entity.data.get(name)),
# If not, default to a NullSensor.
lambda *_: NullSensor(),
),
),
)(theengs_entity.data.get(name), conf)
for name, conf in theengs_entity.properties.items()
}
for prop, entity in parsed_entities.items():
if isinstance(entity, NullSensor):
# Skip entities that we couldn't parse.
continue
entity.id = f'{parent_entity.id}:{prop}'
entity.name = prop
parent_entity.children.append(entity)
return parent_entity
def _parse_advertisement_data(data: AdvertisementData) -> TheengsEntity:
"""
:param data: The data received from a Bluetooth advertisement packet.
:return: A :class:`platypush.entity.bluetooth.TheengsEntity` instance that
maps the parsed attributes.
"""
entity_args, properties, brand, model, model_id = ({}, {}, None, None, None)
if data.service_data:
parsed_data = list(data.service_data.keys())[0]
# TheengsDecoder only accepts 16 bit uuid's, this converts the 128 bit uuid to 16 bit.
entity_args['servicedatauuid'] = parsed_data[4:8]
parsed_data = str(list(data.service_data.values())[0].hex())
entity_args['servicedata'] = parsed_data
if data.manufacturer_data:
parsed_data = str(
struct.pack('<H', list(data.manufacturer_data.keys())[0]).hex()
)
parsed_data += str(list(data.manufacturer_data.values())[0].hex())
entity_args['manufacturerdata'] = parsed_data
if data.local_name:
entity_args['name'] = data.local_name
if entity_args:
encoded_ret = decodeBLE(json.dumps(entity_args))
if encoded_ret:
entity_args = json.loads(encoded_ret)
if entity_args.get('model_id'):
properties = json.loads(getProperties(entity_args['model_id'])).get(
'properties', {}
)
model = getAttribute(entity_args['model_id'], 'model')
model_id = entity_args.pop('model_id', None)
return TheengsEntity(
data=entity_args,
properties=properties,
brand=brand,
model=model,
model_id=model_id,
)
def parse_device_args(device: BLEDevice) -> Dict[str, Any]:
"""
:param device: The device to parse.
:return: The mapped device arguments required to initialize a
:class:`platypush.entity.bluetooth.BluetoothDevice` or
:class:`platypush.message.event.bluetooth.BluetoothDeviceEvent`
object.
"""
props = device.details.get('props', {})
return {
'name': device.name or device.address,
'connected': props.get('Connected', False),
'paired': props.get('Paired', False),
'blocked': props.get('Blocked', False),
'trusted': props.get('Trusted', False),
'rssi': device.rssi,
'tx_power': props.get('TxPower'),
'uuids': {
uuid: uuidstr_to_str(uuid) for uuid in device.metadata.get('uuids', [])
},
'manufacturers': {
manufacturer_id: company.get(manufacturer_id, 'Unknown')
for manufacturer_id in sorted(
device.metadata.get('manufacturer_data', {}).keys()
)
},
'manufacturer_data': _parse_manufacturer_data(device),
'service_data': _parse_service_data(device),
}
def _parse_manufacturer_data(device: BLEDevice) -> Dict[int, str]:
"""
:param device: The device to parse.
:return: The manufacturer data as a ``manufacturer_id -> hex_string``
mapping.
"""
return {
manufacturer_id: ''.join([f'{x:02x}' for x in value])
for manufacturer_id, value in device.metadata.get(
'manufacturer_data', {}
).items()
}
def _parse_service_data(device: BLEDevice) -> Dict[str, str]:
"""
:param device: The device to parse.
:return: The service data as a ``service_uuid -> hex_string`` mapping.
"""
return {
service_uuid: ''.join([f'{x:02x}' for x in value])
for service_uuid, value in device.details.get('props', {})
.get('ServiceData', {})
.items()
}

View File

@ -1,7 +1,22 @@
manifest:
events: {}
events:
platypush.message.event.bluetooth.BluetoothDeviceBlockedEvent:
platypush.message.event.bluetooth.BluetoothDeviceConnectedEvent:
platypush.message.event.bluetooth.BluetoothDeviceDisconnectedEvent:
platypush.message.event.bluetooth.BluetoothDeviceFoundEvent:
platypush.message.event.bluetooth.BluetoothDeviceLostEvent:
platypush.message.event.bluetooth.BluetoothDeviceNewDataEvent:
platypush.message.event.bluetooth.BluetoothDevicePairedEvent:
platypush.message.event.bluetooth.BluetoothDeviceTrustedEvent:
platypush.message.event.bluetooth.BluetoothDeviceUnblockedEvent:
platypush.message.event.bluetooth.BluetoothDeviceUnpairedEvent:
platypush.message.event.bluetooth.BluetoothDeviceUntrustedEvent:
platypush.message.event.bluetooth.BluetoothScanPausedEvent:
platypush.message.event.bluetooth.BluetoothScanResumedEvent:
install:
pip:
- bleak
- bluetooth-numbers
- git+https://github.com/BlackLight/TheengsGateway
package: platypush.plugins.bluetooth.ble
type: plugin

View File

@ -53,7 +53,7 @@ class MusicMpdPlugin(MusicPlugin):
while n_tries > 0:
try:
n_tries -= 1
self.client = mpd.MPDClient(use_unicode=True)
self.client = mpd.MPDClient()
self.client.connect(self.host, self.port)
return self.client
except Exception as e:

View File

@ -16,7 +16,7 @@ from platypush.entities.motion import MotionSensor
from platypush.entities.sensors import (
BinarySensor,
EnumSensor,
MultiValueSensor,
CompositeSensor,
NumericSensor,
)
from platypush.entities.switches import EnumSwitch, Switch
@ -492,7 +492,7 @@ device_mappers: List[DeviceMapper] = [
),
# three_axis
DeviceMapper(
entity_type=MultiValueSensor,
entity_type=CompositeSensor,
capability=Capability.three_axis,
attribute=Attribute.three_axis,
value_type=list,

View File

@ -50,6 +50,7 @@ class WebsocketPlugin(AsyncRunnablePlugin):
ssl_cafile=None,
ssl_capath=None,
wait_response=False,
timeout=None,
):
"""
Sends a message to a websocket.
@ -66,6 +67,9 @@ class WebsocketPlugin(AsyncRunnablePlugin):
required by the SSL configuration (default: None)
:param wait_response: Set to True if you expect a response to the
delivered message.
:param timeout: If ``wait_response=True``, then ``timeout`` establishes
how long we should wait for a response before returning (default:
no timeout).
:return: The received response if ``wait_response`` is set to True,
otherwise nothing.
"""
@ -87,10 +91,13 @@ class WebsocketPlugin(AsyncRunnablePlugin):
try:
await ws.send(str(msg))
except ConnectionClosed as err:
self.logger.warning('Error on websocket %s: %s', url, err)
self.logger.warning(
'Connection error to websocket %s: %s', url, err
)
if wait_response:
messages = await self._recv(ws, num_messages=1)
messages = await self._recv(ws, num_messages=1, timeout=timeout)
if messages:
return self._parse_msg(messages[0])
@ -145,7 +152,7 @@ class WebsocketPlugin(AsyncRunnablePlugin):
return self.loop.call_soon_threadsafe(recv)
async def _recv(self, ws, timeout=0, num_messages=0):
async def _recv(self, ws, timeout: Optional[float] = 0, num_messages=0):
messages = []
time_start = time.time()
time_end = time_start + timeout if timeout else 0

View File

@ -12,10 +12,10 @@ python-dateutil
tz
frozendict
requests
sqlalchemy
sqlalchemy<2.0.0
bcrypt
rsa
zeroconf
zeroconf>=0.27.0
paho-mqtt
websocket-client
croniter

View File

@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.24.4
current_version = 0.24.5
commit = True
tag = True

View File

@ -28,7 +28,7 @@ backend = pkg_files('platypush/backend')
setup(
name="platypush",
version="0.24.4",
version="0.24.5",
author="Fabio Manganiello",
author_email="info@fabiomanganiello.com",
description="Platypush service",
@ -57,7 +57,7 @@ setup(
'redis',
'requests',
'croniter',
'sqlalchemy',
'sqlalchemy<2.0.0',
'websockets',
'websocket-client',
'wheel',
@ -173,9 +173,11 @@ setup(
],
# Support for Alexa/Echo plugin
'alexa': ['avs @ https://github.com/BlackLight/avs/tarball/master'],
# Support for bluetooth devices
# Support for Bluetooth devices
'bluetooth': [
'bleak',
'bluetooth-numbers',
'pybluez',
'pyobex @ https://github.com/BlackLight/PyOBEX/tarball/master',
],
# Support for TP-Link devices