diff --git a/platypush/entities/__init__.py b/platypush/entities/__init__.py index 6317419d..5d29d40d 100644 --- a/platypush/entities/__init__.py +++ b/platypush/entities/__init__.py @@ -5,6 +5,7 @@ from typing import Collection, Optional from ._base import ( Entity, + EntityKey, EntitySavedCallback, get_entities_registry, init_entities_db, @@ -80,6 +81,7 @@ __all__ = ( 'DimmerEntityManager', 'EntitiesEngine', 'Entity', + 'EntityKey', 'EntityManager', 'EntitySavedCallback', 'EnumSwitchEntityManager', diff --git a/platypush/entities/_base.py b/platypush/entities/_base.py index 3ba822a2..d02d0e44 100644 --- a/platypush/entities/_base.py +++ b/platypush/entities/_base.py @@ -27,6 +27,11 @@ from platypush.message import JSONAble EntityRegistryType = Dict[str, Type['Entity']] entities_registry: EntityRegistryType = {} +EntityKey = Tuple[str, str] +""" The entity's logical key, as an ```` tuple. """ +EntityMapping = Dict[EntityKey, 'Entity'] +""" Internal mapping for entities used for deduplication/merge/upsert. """ + _import_error_ignored_modules: Final[Set[str]] = {'bluetooth'} """ ImportError exceptions will be ignored for these entity submodules when @@ -110,7 +115,7 @@ if 'entity' not in Base.metadata: return tuple(inspector.mapper.column_attrs) @property - def entity_key(self) -> Tuple[str, str]: + def entity_key(self) -> EntityKey: """ This method returns the "external" key of an entity. """ diff --git a/platypush/entities/_engine/__init__.py b/platypush/entities/_engine/__init__.py index 7f5df636..c23c4b34 100644 --- a/platypush/entities/_engine/__init__.py +++ b/platypush/entities/_engine/__init__.py @@ -1,13 +1,13 @@ from logging import getLogger from threading import Thread, Event -from typing import Dict, Optional, Tuple +from typing import Dict, Optional from platypush.context import get_bus from platypush.entities import Entity from platypush.message.event.entities import EntityUpdateEvent from platypush.utils import set_thread_name -from platypush.entities._base import EntitySavedCallback +from platypush.entities._base import EntityKey, EntitySavedCallback from platypush.entities._engine.queue import EntitiesQueue from platypush.entities._engine.repo import EntitiesRepository @@ -46,7 +46,7 @@ class EntitiesEngine(Thread): """ Queue where all entity upsert requests are received.""" self._repo = EntitiesRepository() """ The repository of the processed entities. """ - self._callbacks: Dict[Tuple[str, str], EntitySavedCallback] = {} + self._callbacks: Dict[EntityKey, EntitySavedCallback] = {} """ (external_id, plugin) -> callback mapping""" def post(self, *entities: Entity, callback: Optional[EntitySavedCallback] = None): diff --git a/platypush/entities/_engine/repo/__init__.py b/platypush/entities/_engine/repo/__init__.py index 30665513..49b0b8a3 100644 --- a/platypush/entities/_engine/repo/__init__.py +++ b/platypush/entities/_engine/repo/__init__.py @@ -1,9 +1,9 @@ import logging -from typing import Dict, Iterable, Tuple +from typing import Dict, Iterable, Optional, Tuple from sqlalchemy.orm import Session -from platypush.entities import Entity +from platypush.entities._base import Entity, EntityMapping # pylint: disable=no-name-in-module from platypush.entities._engine.repo.db import EntitiesDb @@ -20,7 +20,7 @@ class EntitiesRepository: def __init__(self): self._db = EntitiesDb() - self._merger = EntitiesMerger(self) + self._merge = EntitiesMerger() def get( self, session: Session, entities: Iterable[Entity] @@ -43,7 +43,63 @@ class EntitiesRepository: autocommit=False, expire_on_commit=False, ) as session: - merged_entities = self._merger.merge(session, entities) + merged_entities = self._merge( + session, + entities, + existing_entities=self._fetch_all_and_flatten(session, entities), + ) + merged_entities = self._db.upsert(session, merged_entities) return merged_entities + + def _fetch_all_and_flatten( + self, + session: Session, + entities: Iterable[Entity], + ) -> EntityMapping: + """ + Given a collection of entities, retrieves their persisted instances + (lookup is performed by ``entity_key``), and it also recursively + expands their relationships, so the session is updated with the latest + persisted versions of all the objects in the hierarchy. + + :return: An ``entity_key -> entity`` mapping. + """ + expanded_entities = {} + for entity in entities: + root_entity = self._get_root_entity(session, entity) + expanded_entities.update(self._expand_children([root_entity])) + expanded_entities.update(self._expand_children([entity])) + + return self.get(session, expanded_entities.values()) + + @classmethod + def _expand_children( + cls, + entities: Iterable[Entity], + all_entities: Optional[EntityMapping] = None, + ) -> EntityMapping: + """ + Recursively expands and flattens all the children of a set of entities + into an ``entity_key -> entity`` mapping. + """ + all_entities = all_entities or {} + for entity in entities: + all_entities[entity.entity_key] = entity + cls._expand_children(entity.children, all_entities) + + return all_entities + + def _get_root_entity(self, session: Session, entity: Entity) -> Entity: + """ + Retrieve the root entity (i.e. the one with a null parent) of an + entity. + """ + parent = entity + while parent: + parent = self._merge.get_parent(session, entity) + if parent: + entity = parent + + return entity diff --git a/platypush/entities/_engine/repo/db.py b/platypush/entities/_engine/repo/db.py index 4eeba786..aab1187a 100644 --- a/platypush/entities/_engine/repo/db.py +++ b/platypush/entities/_engine/repo/db.py @@ -6,7 +6,7 @@ from sqlalchemy import and_, or_ from sqlalchemy.orm import Session from platypush.context import get_plugin -from platypush.entities import Entity +from platypush.entities._base import Entity @dataclass diff --git a/platypush/entities/_engine/repo/merger.py b/platypush/entities/_engine/repo/merger.py index eb8681a1..6f3e9b34 100644 --- a/platypush/entities/_engine/repo/merger.py +++ b/platypush/entities/_engine/repo/merger.py @@ -1,34 +1,30 @@ -from typing import Dict, Iterable, List, Optional, Tuple +from typing import Iterable, List, Optional from sqlalchemy.orm import Session, exc -from platypush.entities import Entity +from platypush.entities._base import Entity, EntityMapping # pylint: disable=too-few-public-methods class EntitiesMerger: """ - This object is in charge of detecting and merging entities that already - exist on the database before flushing the session. + A stateless functor in charge of detecting and merging entities that + already exist on the database before flushing the session. """ - def __init__(self, repository): - from . import EntitiesRepository - - self._repo: EntitiesRepository = repository - - def merge( + def __call__( self, session: Session, entities: Iterable[Entity], + existing_entities: Optional[EntityMapping] = None, ) -> List[Entity]: """ Merge a set of entities with their existing representations and update - the parent/child relationships and return a tuple with - ``[new_entities, updated_entities]``. + the parent/child relationships and return a list containing + ``[*updated_entities, *new_entities]``. """ - new_entities: Dict[Tuple[str, str], Entity] = {} - existing_entities: Dict[Tuple[str, str], Entity] = {} + existing_entities = existing_entities or {} + new_entities: EntityMapping = {} self._merge( session, @@ -37,156 +33,164 @@ class EntitiesMerger: existing_entities=existing_entities, ) - return [*existing_entities.values(), *new_entities.values()] + return list({**existing_entities, **new_entities}.values()) def _merge( self, session: Session, entities: Iterable[Entity], - new_entities: Dict[Tuple[str, str], Entity], - existing_entities: Dict[Tuple[str, str], Entity], + new_entities: EntityMapping, + existing_entities: EntityMapping, ) -> List[Entity]: """ (Recursive) inner implementation of the entity merge logic. """ processed_entities = [] - existing_entities.update(self._repo.get(session, entities)) - - # Make sure that we have no duplicate entity keys in the current batch - entities = list( - { - **({e.entity_key: e for e in entities}), - **( - { - e.entity_key: e - for e in {str(ee.id): ee for ee in entities if ee.id}.values() - } - ), - }.values() - ) # Retrieve existing records and merge them for entity in entities: key = entity.entity_key existing_entity = existing_entities.get(key, new_entities.get(key)) - parent_id, parent = self._update_parent(session, entity, new_entities) + + # Synchronize the parent(s) + entity = self._sync_parent(session, entity, new_entities, existing_entities) if existing_entity: - # Update the parent - if not parent_id and parent: - existing_entity.parent = parent - else: - existing_entity.parent_id = parent_id - - # Merge the other columns - self._merge_columns(entity, existing_entity) + # Merge the columns with those of the existing entity + existing_entity = self._merge_columns(entity, existing_entity) # Merge the children - self._merge(session, entity.children, new_entities, existing_entities) - # Use the updated version of the existing entity. + self._append_children( + existing_entity, + *self._merge( + session, + entity.children, + new_entities, + existing_entities, + ) + ) + + # Use the existing entity now that it's been merged entity = existing_entity else: - # Add it to the map of new entities if the entity doesn't exist - # on the repo + # Add it to the map of new entities if the entity doesn't exist on the db new_entities[key] = entity processed_entities.append(entity) return processed_entities - def _update_parent( - self, + @classmethod + def _sync_parent( + cls, session: Session, entity: Entity, - new_entities: Dict[Tuple[str, str], Entity], - ) -> Tuple[Optional[int], Optional[Entity]]: + new_entities: EntityMapping, + existing_entities: EntityMapping, + ) -> Entity: """ - Recursively update the hierarchy of an entity, moving upwards towards - the parent. + Recursively refresh the parent of an entity all the way up in the + hierarchy, to make sure that all the parent/child relations are + appropriately rewired and that all the relevant objects are added to + this session. """ - parent_id: Optional[int] = entity.parent_id - try: - parent: Optional[Entity] = entity.parent - except exc.DetachedInstanceError: - # Dirty fix for `Parent instance <...> is not bound to a Session; - # lazy load operation of attribute 'parent' cannot proceed - parent = session.query(Entity).get(parent_id) if parent_id else None + parent = cls.get_parent(session, entity) + if not parent: + # No parent -> we can terminate the recursive climbing + return entity - # If the entity has a parent with an ID, use that - if parent and parent.id: - parent_id = parent_id or parent.id + # Check if an entity with the same key as the reported parent already + # exists in the cached entities + existing_parent = existing_entities.get( + parent.entity_key, new_entities.get(parent.entity_key) + ) - # If there's no parent_id but there is a parent object, try to fetch - # its stored version - if not parent_id and parent: - batch = list(self._repo.get(session, [parent]).values()) + if not existing_parent: + # No existing parent -> we need to flush the one reported by this + # entity + return entity - # If the parent is already stored, use its ID - if batch: - parent = batch[0] - parent_id = parent.id + # Check if the existing parent already has a child with the same key as + # this entity + existing_entity = next( + iter( + child + for child in existing_parent.children + if child.entity_key == entity.entity_key + ), + None, + ) - # Otherwise, check if its key is already among those awaiting flush - # and reuse the same objects (prevents SQLAlchemy from generating - # duplicate inserts) - else: - temp_entity = new_entities.get(parent.entity_key) - if temp_entity: - self._remove_duplicate_children(entity, temp_entity) - parent = entity.parent = temp_entity - else: - new_entities[parent.entity_key] = parent - - # Recursively apply any changes up in the hierarchy - self._update_parent(session, parent, new_entities=new_entities) - - # If we found a parent_id, populate it on the entity (and remove the - # supporting relationship object so SQLAlchemy doesn't go nuts when - # flushing) - if parent_id: + if not existing_entity: + # If this entity isn't currently a member of the existing parent, + # temporarily reset the parent of the current entity, so we won't + # carry stale objects around. We will soon rewire it to the + # existing parent. entity.parent = None - entity.parent_id = parent_id + else: + # Otherwise, merge the columns of the existing entity with those of + # the new entity and use the existing entity + entity = cls._merge_columns(entity, existing_entity) - return parent_id, parent + # Refresh the existing collection of children with the new/updated + # entity + cls._append_children(existing_parent, entity) + + # Recursively call this function to synchronize any parent entities up + # in the taxonomy + cls._sync_parent(session, existing_parent, new_entities, existing_entities) + return entity @staticmethod - def _remove_duplicate_children(entity: Entity, parent: Optional[Entity] = None): - if not parent: - return - - # Make sure that an entity has no duplicate entity IDs among its - # children - existing_child_index_by_id = None - if entity.id: - try: - existing_child_index_by_id = [e.id for e in parent.children].index( - entity.id - ) - parent.children.pop(existing_child_index_by_id) - except ValueError: - pass - - # Make sure that an entity has no duplicate entity keys among its - # children - existing_child_index_by_key = None + def get_parent(session: Session, entity: Entity) -> Optional[Entity]: + """ + Gets the parent of an entity, and it fetches if it's not available in + the current session. + """ try: - existing_child_index_by_key = [e.entity_key for e in parent.children].index( - entity.entity_key + return entity.parent + except exc.DetachedInstanceError: + # Dirty fix for `Parent instance <...> is not bound to a Session; + # lazy load operation of attribute 'parent' cannot proceed` + return ( + session.query(Entity).get(entity.parent_id) + if entity.parent_id + else None ) - parent.children.pop(existing_child_index_by_key) - except ValueError: - pass - @classmethod - def _merge_columns(cls, entity: Entity, existing_entity: Entity) -> Entity: + @staticmethod + def _append_children(entity: Entity, *children: Entity): + """ + Update the list of children of a given entity with the given list of + entities. + + Note that, in case of ``entity_key`` conflict (the key of a new entity + already exists in the entity's children), the most recent version will + be used, so any column merge logic needs to happen before this method + is called. + """ + entity.children = list( + { + **{e.entity_key: e for e in entity.children}, + **{e.entity_key: e for e in children}, + }.values() + ) + + for child in children: + child.parent = entity + if entity.id: + child.parent_id = entity.id + + @staticmethod + def _merge_columns(entity: Entity, existing_entity: Entity) -> Entity: """ Merge two versions of an entity column by column. """ columns = [col.key for col in entity.columns] for col in columns: if col == 'meta': - existing_entity.meta = { - **(existing_entity.meta or {}), - **(entity.meta or {}), + existing_entity.meta = { # type: ignore + **(existing_entity.meta or {}), # type: ignore + **(entity.meta or {}), # type: ignore } elif col not in ('id', 'created_at'): setattr(existing_entity, col, getattr(entity, col))