@ -1 +1 @@
@ -1,2 +0,0 @@
@ -1,2 +0,0 @@
@ -1,7 +1,4 @@
{ {
"bluetooth": {
"class": "fab fa-bluetooth"
"": { "": {
"class": "fab fa-android" "class": "fab fa-android"
}, },

@ -1,75 +0,0 @@
<div class="entity bluetooth-service-container">
<div class="head">
<div class="col-1 icon">
:error="error" />
<div class="col-9 label">
<div class="name" v-text="" />
<div class="col-2 connector pull-right">
@click.stop />
import ToggleSwitch from "@/components/elements/ToggleSwitch"
import EntityIcon from "./EntityIcon"
import EntityMixin from "./EntityMixin"
export default {
name: 'BluetoothService',
components: {ToggleSwitch, EntityIcon},
mixins: [EntityMixin],
methods: {
async connect(event) {
this.$emit('loading', true)
try {
await this.request('bluetooth.connect', {
device: this.parent.address,
service_uuid: this.uuid,
} finally {
this.$emit('loading', false)
async disconnect(event) {
this.$emit('loading', true)
try {
await this.request('bluetooth.disconnect', {
device: this.parent.address,
} finally {
this.$emit('loading', false)
<style lang="scss" scoped>
@import "common";
.switch-container {
.switch {
direction: rtl;

@ -7,7 +7,6 @@
<component <component
:is="component" :is="component"
:value="value" :value="value"
:loading="loading" :loading="loading"
ref="instance" ref="instance"
:error="error || value?.reachable == false" :error="error || value?.reachable == false"
@ -27,7 +26,6 @@
<div class="child" v-for="entity in computedChildren" :key=""> <div class="child" v-for="entity in computedChildren" :key="">
<Entity <Entity
:value="entity" :value="entity"
:loading="loading" :loading="loading"
:level="level + 1" :level="level + 1"
@input="$emit('input', entity)" /> @input="$emit('input', entity)" />

@ -21,11 +21,6 @@ export default {
required: true, required: true,
}, },
parent: {
type: Object,
default: () => {},
children: { children: {
type: Object, type: Object,
default: () => {}, default: () => {},

@ -39,14 +39,6 @@
} }
}, },
"bluetooth_service": {
"name": "Service",
"name_plural": "Services",
"icon": {
"class": "fas fa-satellite-dish"
"device": { "device": {
"name": "Device", "name": "Device",
"name_plural": "Devices", "name_plural": "Devices",

@ -5,7 +5,6 @@ from typing import Collection, Optional
from ._base import ( from ._base import (
Entity, Entity,
EntitySavedCallback, EntitySavedCallback,
get_entities_registry, get_entities_registry,
init_entities_db, init_entities_db,
@ -81,7 +80,6 @@ __all__ = (
'DimmerEntityManager', 'DimmerEntityManager',
'EntitiesEngine', 'EntitiesEngine',
'Entity', 'Entity',
'EntityManager', 'EntityManager',
'EntitySavedCallback', 'EntitySavedCallback',
'EnumSwitchEntityManager', 'EnumSwitchEntityManager',

@ -27,11 +27,6 @@ from platypush.message import JSONAble
EntityRegistryType = Dict[str, Type['Entity']] EntityRegistryType = Dict[str, Type['Entity']]
entities_registry: EntityRegistryType = {} entities_registry: EntityRegistryType = {}
EntityKey = Tuple[str, str]
""" The entity's logical key, as an ``<external_id, plugin>`` tuple. """
EntityMapping = Dict[EntityKey, 'Entity']
""" Internal mapping for entities used for deduplication/merge/upsert. """
_import_error_ignored_modules: Final[Set[str]] = {'bluetooth'} _import_error_ignored_modules: Final[Set[str]] = {'bluetooth'}
""" """
ImportError exceptions will be ignored for these entity submodules when ImportError exceptions will be ignored for these entity submodules when
@ -115,7 +110,7 @@ if 'entity' not in Base.metadata:
return tuple(inspector.mapper.column_attrs) return tuple(inspector.mapper.column_attrs)
@property @property
def entity_key(self) -> EntityKey: def entity_key(self) -> Tuple[str, str]:
""" """
This method returns the "external" key of an entity. This method returns the "external" key of an entity.
""" """

@ -1,13 +1,13 @@
from logging import getLogger from logging import getLogger
from threading import Thread, Event from threading import Thread, Event
from typing import Dict, Optional from typing import Dict, Optional, Tuple
from platypush.context import get_bus from platypush.context import get_bus
from platypush.entities import Entity from platypush.entities import Entity
from platypush.message.event.entities import EntityUpdateEvent from platypush.message.event.entities import EntityUpdateEvent
from platypush.utils import set_thread_name from platypush.utils import set_thread_name
from platypush.entities._base import EntityKey, EntitySavedCallback from platypush.entities._base import EntitySavedCallback
from platypush.entities._engine.queue import EntitiesQueue from platypush.entities._engine.queue import EntitiesQueue
from platypush.entities._engine.repo import EntitiesRepository from platypush.entities._engine.repo import EntitiesRepository
@ -46,7 +46,7 @@ class EntitiesEngine(Thread):
""" Queue where all entity upsert requests are received.""" """ Queue where all entity upsert requests are received."""
self._repo = EntitiesRepository() self._repo = EntitiesRepository()
""" The repository of the processed entities. """ """ The repository of the processed entities. """
self._callbacks: Dict[EntityKey, EntitySavedCallback] = {} self._callbacks: Dict[Tuple[str, str], EntitySavedCallback] = {}
""" (external_id, plugin) -> callback mapping""" """ (external_id, plugin) -> callback mapping"""
def post(self, *entities: Entity, callback: Optional[EntitySavedCallback] = None): def post(self, *entities: Entity, callback: Optional[EntitySavedCallback] = None):

@ -1,9 +1,9 @@
import logging import logging
from typing import Dict, Iterable, Optional, Tuple from typing import Dict, Iterable, Tuple
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from platypush.entities._base import Entity, EntityMapping from platypush.entities import Entity
# pylint: disable=no-name-in-module # pylint: disable=no-name-in-module
from platypush.entities._engine.repo.db import EntitiesDb from platypush.entities._engine.repo.db import EntitiesDb
@ -20,7 +20,7 @@ class EntitiesRepository:
def __init__(self): def __init__(self):
self._db = EntitiesDb() self._db = EntitiesDb()
self._merge = EntitiesMerger() self._merger = EntitiesMerger(self)
def get( def get(
self, session: Session, entities: Iterable[Entity] self, session: Session, entities: Iterable[Entity]
@ -43,63 +43,7 @@ class EntitiesRepository:
autocommit=False, autocommit=False,
expire_on_commit=False, expire_on_commit=False,
) as session: ) as session:
merged_entities = self._merge( merged_entities = self._merger.merge(session, entities)
existing_entities=self._fetch_all_and_flatten(session, entities),
merged_entities = self._db.upsert(session, merged_entities) merged_entities = self._db.upsert(session, merged_entities)
return merged_entities return merged_entities
def _fetch_all_and_flatten(
session: Session,
entities: Iterable[Entity],
) -> EntityMapping:
Given a collection of entities, retrieves their persisted instances
(lookup is performed by ``entity_key``), and it also recursively
expands their relationships, so the session is updated with the latest
persisted versions of all the objects in the hierarchy.
:return: An ``entity_key -> entity`` mapping.
expanded_entities = {}
for entity in entities:
root_entity = self._get_root_entity(session, entity)
return self.get(session, expanded_entities.values())
def _expand_children(
entities: Iterable[Entity],
all_entities: Optional[EntityMapping] = None,
) -> EntityMapping:
Recursively expands and flattens all the children of a set of entities
into an ``entity_key -> entity`` mapping.
all_entities = all_entities or {}
for entity in entities:
all_entities[entity.entity_key] = entity
cls._expand_children(entity.children, all_entities)
return all_entities
def _get_root_entity(self, session: Session, entity: Entity) -> Entity:
Retrieve the root entity (i.e. the one with a null parent) of an
parent = entity
while parent:
parent = self._merge.get_parent(session, entity)
if parent:
entity = parent
return entity

@ -6,7 +6,7 @@ from sqlalchemy import and_, or_
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from platypush.context import get_plugin from platypush.context import get_plugin
from platypush.entities._base import Entity from platypush.entities import Entity
@dataclass @dataclass

@ -1,30 +1,34 @@
from typing import Iterable, List, Optional from typing import Dict, Iterable, List, Optional, Tuple
from sqlalchemy.orm import Session, exc from sqlalchemy.orm import Session, exc
from platypush.entities._base import Entity, EntityMapping from platypush.entities import Entity
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
class EntitiesMerger: class EntitiesMerger:
""" """
A stateless functor in charge of detecting and merging entities that This object is in charge of detecting and merging entities that already
already exist on the database before flushing the session. exist on the database before flushing the session.
""" """
def __call__( def __init__(self, repository):
from . import EntitiesRepository
self._repo: EntitiesRepository = repository
def merge(
self, self,
session: Session, session: Session,
entities: Iterable[Entity], entities: Iterable[Entity],
existing_entities: Optional[EntityMapping] = None,
) -> List[Entity]: ) -> List[Entity]:
""" """
Merge a set of entities with their existing representations and update Merge a set of entities with their existing representations and update
the parent/child relationships and return a list containing the parent/child relationships and return a tuple with
``[*updated_entities, *new_entities]``. ``[new_entities, updated_entities]``.
""" """
existing_entities = existing_entities or {} new_entities: Dict[Tuple[str, str], Entity] = {}
new_entities: EntityMapping = {} existing_entities: Dict[Tuple[str, str], Entity] = {}
self._merge( self._merge(
session, session,
@ -33,164 +37,156 @@ class EntitiesMerger:
existing_entities=existing_entities, existing_entities=existing_entities,
) )
return list({**existing_entities, **new_entities}.values()) return [*existing_entities.values(), *new_entities.values()]
def _merge( def _merge(
self, self,
session: Session, session: Session,
entities: Iterable[Entity], entities: Iterable[Entity],
new_entities: EntityMapping, new_entities: Dict[Tuple[str, str], Entity],
existing_entities: EntityMapping, existing_entities: Dict[Tuple[str, str], Entity],
) -> List[Entity]: ) -> List[Entity]:
""" """
(Recursive) inner implementation of the entity merge logic. (Recursive) inner implementation of the entity merge logic.
""" """
processed_entities = [] processed_entities = []
existing_entities.update(self._repo.get(session, entities))
# Make sure that we have no duplicate entity keys in the current batch
entities = list(
**({e.entity_key: e for e in entities}),
e.entity_key: e
for e in {str( ee for ee in entities if}.values()
# Retrieve existing records and merge them # Retrieve existing records and merge them
for entity in entities: for entity in entities:
key = entity.entity_key key = entity.entity_key
existing_entity = existing_entities.get(key, new_entities.get(key)) existing_entity = existing_entities.get(key, new_entities.get(key))
parent_id, parent = self._update_parent(session, entity, new_entities)
# Synchronize the parent(s)
entity = self._sync_parent(session, entity, new_entities, existing_entities)
if existing_entity: if existing_entity:
# Merge the columns with those of the existing entity # Update the parent
existing_entity = self._merge_columns(entity, existing_entity) if not parent_id and parent:
# Merge the children existing_entity.parent = parent
self._append_children( else:
existing_entity, existing_entity.parent_id = parent_id
# Use the existing entity now that it's been merged # Merge the other columns
self._merge_columns(entity, existing_entity)
# Merge the children
self._merge(session, entity.children, new_entities, existing_entities)
# Use the updated version of the existing entity.
entity = existing_entity entity = existing_entity
else: else:
# Add it to the map of new entities if the entity doesn't exist on the db # Add it to the map of new entities if the entity doesn't exist
# on the repo
new_entities[key] = entity new_entities[key] = entity
processed_entities.append(entity) processed_entities.append(entity)
return processed_entities return processed_entities
@classmethod def _update_parent(
def _sync_parent( self,
session: Session, session: Session,
entity: Entity, entity: Entity,
new_entities: EntityMapping, new_entities: Dict[Tuple[str, str], Entity],
existing_entities: EntityMapping, ) -> Tuple[Optional[int], Optional[Entity]]:
) -> Entity:
""" """
Recursively refresh the parent of an entity all the way up in the Recursively update the hierarchy of an entity, moving upwards towards
hierarchy, to make sure that all the parent/child relations are the parent.
appropriately rewired and that all the relevant objects are added to
this session.
parent = cls.get_parent(session, entity)
if not parent:
# No parent -> we can terminate the recursive climbing
return entity
# Check if an entity with the same key as the reported parent already
# exists in the cached entities
existing_parent = existing_entities.get(
parent.entity_key, new_entities.get(parent.entity_key)
if not existing_parent:
# No existing parent -> we need to flush the one reported by this
# entity
return entity
# Check if the existing parent already has a child with the same key as
# this entity
existing_entity = next(
for child in existing_parent.children
if child.entity_key == entity.entity_key
if not existing_entity:
# If this entity isn't currently a member of the existing parent,
# temporarily reset the parent of the current entity, so we won't
# carry stale objects around. We will soon rewire it to the
# existing parent.
entity.parent = None
# Otherwise, merge the columns of the existing entity with those of
# the new entity and use the existing entity
entity = cls._merge_columns(entity, existing_entity)
# Refresh the existing collection of children with the new/updated
# entity
cls._append_children(existing_parent, entity)
# Recursively call this function to synchronize any parent entities up
# in the taxonomy
cls._sync_parent(session, existing_parent, new_entities, existing_entities)
return entity
def get_parent(session: Session, entity: Entity) -> Optional[Entity]:
Gets the parent of an entity, and it fetches if it's not available in
the current session.
""" """
parent_id: Optional[int] = entity.parent_id
try: try:
return entity.parent parent: Optional[Entity] = entity.parent
except exc.DetachedInstanceError: except exc.DetachedInstanceError:
# Dirty fix for `Parent instance <...> is not bound to a Session; # Dirty fix for `Parent instance <...> is not bound to a Session;
# lazy load operation of attribute 'parent' cannot proceed` # lazy load operation of attribute 'parent' cannot proceed
return ( parent = session.query(Entity).get(parent_id) if parent_id else None
if entity.parent_id # If the entity has a parent with an ID, use that
else None if parent and
parent_id = parent_id or
# If there's no parent_id but there is a parent object, try to fetch
# its stored version
if not parent_id and parent:
batch = list(self._repo.get(session, [parent]).values())
# If the parent is already stored, use its ID
if batch:
parent = batch[0]
parent_id =
# Otherwise, check if its key is already among those awaiting flush
# and reuse the same objects (prevents SQLAlchemy from generating
# duplicate inserts)
temp_entity = new_entities.get(parent.entity_key)
if temp_entity:
self._remove_duplicate_children(entity, temp_entity)
parent = entity.parent = temp_entity
new_entities[parent.entity_key] = parent
# Recursively apply any changes up in the hierarchy
self._update_parent(session, parent, new_entities=new_entities)
# If we found a parent_id, populate it on the entity (and remove the
# supporting relationship object so SQLAlchemy doesn't go nuts when
# flushing)
if parent_id:
entity.parent = None
entity.parent_id = parent_id
return parent_id, parent
def _remove_duplicate_children(entity: Entity, parent: Optional[Entity] = None):
if not parent:
# Make sure that an entity has no duplicate entity IDs among its
# children
existing_child_index_by_id = None
existing_child_index_by_id = [ for e in parent.children].index(
except ValueError:
# Make sure that an entity has no duplicate entity keys among its
# children
existing_child_index_by_key = None
existing_child_index_by_key = [e.entity_key for e in parent.children].index(
) )
except ValueError:
@staticmethod @classmethod
def _append_children(entity: Entity, *children: Entity): def _merge_columns(cls, entity: Entity, existing_entity: Entity) -> Entity:
Update the list of children of a given entity with the given list of
Note that, in case of ``entity_key`` conflict (the key of a new entity
already exists in the entity's children), the most recent version will
be used, so any column merge logic needs to happen before this method
is called.
entity.children = list(
**{e.entity_key: e for e in entity.children},
**{e.entity_key: e for e in children},
for child in children:
child.parent = entity
child.parent_id =
def _merge_columns(entity: Entity, existing_entity: Entity) -> Entity:
""" """
Merge two versions of an entity column by column. Merge two versions of an entity column by column.
""" """
columns = [col.key for col in entity.columns] columns = [col.key for col in entity.columns]
for col in columns: for col in columns:
if col == 'meta': if col == 'meta':
existing_entity.meta = { # type: ignore existing_entity.meta = {
**(existing_entity.meta or {}), # type: ignore **(existing_entity.meta or {}),
**(entity.meta or {}), # type: ignore **(entity.meta or {}),
} }
elif col not in ('id', 'created_at'): elif col not in ('id', 'created_at'):
setattr(existing_entity, col, getattr(entity, col)) setattr(existing_entity, col, getattr(entity, col))

@ -158,6 +158,10 @@ if 'bluetooth_device' not in Base.metadata:
def to_dict(self): def to_dict(self):
""" """
Overwrites ``to_dict`` to transform private column names into their Overwrites ``to_dict`` to transform private column names into their
public representation. public representation, and also include the exposed services and
child entities.
""" """
return {k.lstrip('_'): v for k, v in super().to_dict().items()} return {
**{k.lstrip('_'): v for k, v in super().to_dict().items()},
'children': [child.to_dict() for child in self.children],

@ -48,9 +48,6 @@ if 'bluetooth_service' not in Base.metadata:
is_ble = Column(Boolean, default=False) is_ble = Column(Boolean, default=False)
""" Whether the service is a BLE service. """ """ Whether the service is a BLE service. """
connected = Column(Boolean, default=False)
""" Whether an active connection exists to this service. """
__mapper_args__ = { __mapper_args__ = {
'polymorphic_identity': __tablename__, 'polymorphic_identity': __tablename__,
} }

@ -216,7 +216,7 @@ def _parse_services(device: BLEDevice) -> List[BluetoothService]:
BluetoothService( BluetoothService(
id=f'{device.address}:{uuid}', id=f'{device.address}:{uuid}',
uuid=uuid, uuid=uuid,
name=f'[{uuid}]' if srv_cls == ServiceClass.UNKNOWN else str(srv_cls), name=str(srv_cls),
protocol=Protocol.L2CAP, protocol=Protocol.L2CAP,
is_ble=True, is_ble=True,
) )
@ -236,7 +236,9 @@ def device_to_entity(device: BLEDevice, data: AdvertisementData) -> BluetoothDev
theengs_entity = _parse_advertisement_data(data) theengs_entity = _parse_advertisement_data(data)
props = (device.details or {}).get('props', {}) props = (device.details or {}).get('props', {})
manufacturer = theengs_entity.manufacturer or company.get( manufacturer = theengs_entity.manufacturer or company.get(
next(iter(key for key in device.metadata['manufacturer_data']), 0xFFFF) list(device.metadata['manufacturer_data'].keys())[0]
if device.metadata.get('manufacturer_data', {})
else None
) )
parent_entity = BluetoothDevice( parent_entity = BluetoothDevice(
@ -277,8 +279,8 @@ def device_to_entity(device: BLEDevice, data: AdvertisementData) -> BluetoothDev
# Skip entities that we couldn't parse. # Skip entities that we couldn't parse.
continue continue = f'{parent_entity.address}::{prop}' = f'{}:{prop}' = prop.title() = prop
parent_entity.children.append(entity) parent_entity.children.append(entity)
entity.parent = parent_entity entity.parent = parent_entity

@ -174,23 +174,15 @@ class LegacyManager(BaseBluetoothManager):
raise AssertionError(f'Connection to {device} timed out') from e raise AssertionError(f'Connection to {device} timed out') from e
dev.connected = True dev.connected = True
conn.service.connected = True
self.notify(BluetoothDeviceConnectedEvent, dev) self.notify(BluetoothDeviceConnectedEvent, dev)
yield conn yield conn
# Close the connection once the context is over # Close the connection once the context is over
with self._connection_locks[conn.key]: with self._connection_locks[conn.key]:
try: conn.close()
except Exception as e:
'Error while closing the connection to %s: %s', device, e
self._connections.pop(conn.key, None) self._connections.pop(conn.key, None)
dev.connected = False dev.connected = False
conn.service.connected = False
self.notify(BluetoothDeviceDisconnectedEvent, dev) self.notify(BluetoothDeviceDisconnectedEvent, dev)
@override @override

@ -86,7 +86,7 @@ class SwitchTplinkPlugin(RunnablePlugin, SwitchEntityManager):
devices: Optional[Mapping[str, SmartDevice]] = None, devices: Optional[Mapping[str, SmartDevice]] = None,
publish_entities: bool = True, publish_entities: bool = True,
): ):
for addr, info in self._static_devices.items(): for (addr, info) in self._static_devices.items():
try: try:
dev = info['type'](addr) dev = info['type'](addr)
self._alias_to_dev[info.get('name', dev.alias)] = dev self._alias_to_dev[info.get('name', dev.alias)] = dev
@ -94,7 +94,7 @@ class SwitchTplinkPlugin(RunnablePlugin, SwitchEntityManager):
except SmartDeviceException as e: except SmartDeviceException as e:
self.logger.warning('Could not communicate with device %s: %s', addr, e) self.logger.warning('Could not communicate with device %s: %s', addr, e)
for ip, dev in (devices or {}).items(): for (ip, dev) in (devices or {}).items():
self._ip_to_dev[ip] = dev self._ip_to_dev[ip] = dev
self._alias_to_dev[dev.alias] = dev self._alias_to_dev[dev.alias] = dev
@ -225,7 +225,7 @@ class SwitchTplinkPlugin(RunnablePlugin, SwitchEntityManager):
return [self._serialize(dev) for dev in self._scan().values()] return [self._serialize(dev) for dev in self._scan().values()]
def main(self): def main(self):
devices = {ip: self._serialize(dev) for ip, dev in self._ip_to_dev.items()} devices = {ip: self._serialize(dev) for ip, dev in self._ip_to_dev}
while not self.should_stop(): while not self.should_stop():
new_devices = self._scan(publish_entities=False) new_devices = self._scan(publish_entities=False)