Big rewrite/refactor of the entities merger

This commit is contained in:
Fabio Manganiello 2023-03-19 12:40:48 +01:00
parent 2411b961e8
commit 878fe91155
Signed by: blacklight
GPG key ID: D90FBA7F76362774
6 changed files with 193 additions and 126 deletions

View file

@ -5,6 +5,7 @@ from typing import Collection, Optional
from ._base import (
Entity,
EntityKey,
EntitySavedCallback,
get_entities_registry,
init_entities_db,
@ -80,6 +81,7 @@ __all__ = (
'DimmerEntityManager',
'EntitiesEngine',
'Entity',
'EntityKey',
'EntityManager',
'EntitySavedCallback',
'EnumSwitchEntityManager',

View file

@ -27,6 +27,11 @@ from platypush.message import JSONAble
EntityRegistryType = Dict[str, Type['Entity']]
entities_registry: EntityRegistryType = {}
EntityKey = Tuple[str, str]
""" The entity's logical key, as an ``<external_id, plugin>`` tuple. """
EntityMapping = Dict[EntityKey, 'Entity']
""" Internal mapping for entities used for deduplication/merge/upsert. """
_import_error_ignored_modules: Final[Set[str]] = {'bluetooth'}
"""
ImportError exceptions will be ignored for these entity submodules when
@ -110,7 +115,7 @@ if 'entity' not in Base.metadata:
return tuple(inspector.mapper.column_attrs)
@property
def entity_key(self) -> Tuple[str, str]:
def entity_key(self) -> EntityKey:
"""
This method returns the "external" key of an entity.
"""

View file

@ -1,13 +1,13 @@
from logging import getLogger
from threading import Thread, Event
from typing import Dict, Optional, Tuple
from typing import Dict, Optional
from platypush.context import get_bus
from platypush.entities import Entity
from platypush.message.event.entities import EntityUpdateEvent
from platypush.utils import set_thread_name
from platypush.entities._base import EntitySavedCallback
from platypush.entities._base import EntityKey, EntitySavedCallback
from platypush.entities._engine.queue import EntitiesQueue
from platypush.entities._engine.repo import EntitiesRepository
@ -46,7 +46,7 @@ class EntitiesEngine(Thread):
""" Queue where all entity upsert requests are received."""
self._repo = EntitiesRepository()
""" The repository of the processed entities. """
self._callbacks: Dict[Tuple[str, str], EntitySavedCallback] = {}
self._callbacks: Dict[EntityKey, EntitySavedCallback] = {}
""" (external_id, plugin) -> callback mapping"""
def post(self, *entities: Entity, callback: Optional[EntitySavedCallback] = None):

View file

@ -1,9 +1,9 @@
import logging
from typing import Dict, Iterable, Tuple
from typing import Dict, Iterable, Optional, Tuple
from sqlalchemy.orm import Session
from platypush.entities import Entity
from platypush.entities._base import Entity, EntityMapping
# pylint: disable=no-name-in-module
from platypush.entities._engine.repo.db import EntitiesDb
@ -20,7 +20,7 @@ class EntitiesRepository:
def __init__(self):
self._db = EntitiesDb()
self._merger = EntitiesMerger(self)
self._merge = EntitiesMerger()
def get(
self, session: Session, entities: Iterable[Entity]
@ -43,7 +43,63 @@ class EntitiesRepository:
autocommit=False,
expire_on_commit=False,
) as session:
merged_entities = self._merger.merge(session, entities)
merged_entities = self._merge(
session,
entities,
existing_entities=self._fetch_all_and_flatten(session, entities),
)
merged_entities = self._db.upsert(session, merged_entities)
return merged_entities
def _fetch_all_and_flatten(
self,
session: Session,
entities: Iterable[Entity],
) -> EntityMapping:
"""
Given a collection of entities, retrieves their persisted instances
(lookup is performed by ``entity_key``), and it also recursively
expands their relationships, so the session is updated with the latest
persisted versions of all the objects in the hierarchy.
:return: An ``entity_key -> entity`` mapping.
"""
expanded_entities = {}
for entity in entities:
root_entity = self._get_root_entity(session, entity)
expanded_entities.update(self._expand_children([root_entity]))
expanded_entities.update(self._expand_children([entity]))
return self.get(session, expanded_entities.values())
@classmethod
def _expand_children(
cls,
entities: Iterable[Entity],
all_entities: Optional[EntityMapping] = None,
) -> EntityMapping:
"""
Recursively expands and flattens all the children of a set of entities
into an ``entity_key -> entity`` mapping.
"""
all_entities = all_entities or {}
for entity in entities:
all_entities[entity.entity_key] = entity
cls._expand_children(entity.children, all_entities)
return all_entities
def _get_root_entity(self, session: Session, entity: Entity) -> Entity:
"""
Retrieve the root entity (i.e. the one with a null parent) of an
entity.
"""
parent = entity
while parent:
parent = self._merge.get_parent(session, entity)
if parent:
entity = parent
return entity

View file

@ -6,7 +6,7 @@ from sqlalchemy import and_, or_
from sqlalchemy.orm import Session
from platypush.context import get_plugin
from platypush.entities import Entity
from platypush.entities._base import Entity
@dataclass

View file

@ -1,34 +1,30 @@
from typing import Dict, Iterable, List, Optional, Tuple
from typing import Iterable, List, Optional
from sqlalchemy.orm import Session, exc
from platypush.entities import Entity
from platypush.entities._base import Entity, EntityMapping
# pylint: disable=too-few-public-methods
class EntitiesMerger:
"""
This object is in charge of detecting and merging entities that already
exist on the database before flushing the session.
A stateless functor in charge of detecting and merging entities that
already exist on the database before flushing the session.
"""
def __init__(self, repository):
from . import EntitiesRepository
self._repo: EntitiesRepository = repository
def merge(
def __call__(
self,
session: Session,
entities: Iterable[Entity],
existing_entities: Optional[EntityMapping] = None,
) -> List[Entity]:
"""
Merge a set of entities with their existing representations and update
the parent/child relationships and return a tuple with
``[new_entities, updated_entities]``.
the parent/child relationships and return a list containing
``[*updated_entities, *new_entities]``.
"""
new_entities: Dict[Tuple[str, str], Entity] = {}
existing_entities: Dict[Tuple[str, str], Entity] = {}
existing_entities = existing_entities or {}
new_entities: EntityMapping = {}
self._merge(
session,
@ -37,156 +33,164 @@ class EntitiesMerger:
existing_entities=existing_entities,
)
return [*existing_entities.values(), *new_entities.values()]
return list({**existing_entities, **new_entities}.values())
def _merge(
self,
session: Session,
entities: Iterable[Entity],
new_entities: Dict[Tuple[str, str], Entity],
existing_entities: Dict[Tuple[str, str], Entity],
new_entities: EntityMapping,
existing_entities: EntityMapping,
) -> List[Entity]:
"""
(Recursive) inner implementation of the entity merge logic.
"""
processed_entities = []
existing_entities.update(self._repo.get(session, entities))
# Make sure that we have no duplicate entity keys in the current batch
entities = list(
{
**({e.entity_key: e for e in entities}),
**(
{
e.entity_key: e
for e in {str(ee.id): ee for ee in entities if ee.id}.values()
}
),
}.values()
)
# Retrieve existing records and merge them
for entity in entities:
key = entity.entity_key
existing_entity = existing_entities.get(key, new_entities.get(key))
parent_id, parent = self._update_parent(session, entity, new_entities)
# Synchronize the parent(s)
entity = self._sync_parent(session, entity, new_entities, existing_entities)
if existing_entity:
# Update the parent
if not parent_id and parent:
existing_entity.parent = parent
else:
existing_entity.parent_id = parent_id
# Merge the other columns
self._merge_columns(entity, existing_entity)
# Merge the columns with those of the existing entity
existing_entity = self._merge_columns(entity, existing_entity)
# Merge the children
self._merge(session, entity.children, new_entities, existing_entities)
# Use the updated version of the existing entity.
self._append_children(
existing_entity,
*self._merge(
session,
entity.children,
new_entities,
existing_entities,
)
)
# Use the existing entity now that it's been merged
entity = existing_entity
else:
# Add it to the map of new entities if the entity doesn't exist
# on the repo
# Add it to the map of new entities if the entity doesn't exist on the db
new_entities[key] = entity
processed_entities.append(entity)
return processed_entities
def _update_parent(
self,
@classmethod
def _sync_parent(
cls,
session: Session,
entity: Entity,
new_entities: Dict[Tuple[str, str], Entity],
) -> Tuple[Optional[int], Optional[Entity]]:
new_entities: EntityMapping,
existing_entities: EntityMapping,
) -> Entity:
"""
Recursively update the hierarchy of an entity, moving upwards towards
the parent.
Recursively refresh the parent of an entity all the way up in the
hierarchy, to make sure that all the parent/child relations are
appropriately rewired and that all the relevant objects are added to
this session.
"""
parent_id: Optional[int] = entity.parent_id
try:
parent: Optional[Entity] = entity.parent
except exc.DetachedInstanceError:
# Dirty fix for `Parent instance <...> is not bound to a Session;
# lazy load operation of attribute 'parent' cannot proceed
parent = session.query(Entity).get(parent_id) if parent_id else None
parent = cls.get_parent(session, entity)
if not parent:
# No parent -> we can terminate the recursive climbing
return entity
# If the entity has a parent with an ID, use that
if parent and parent.id:
parent_id = parent_id or parent.id
# Check if an entity with the same key as the reported parent already
# exists in the cached entities
existing_parent = existing_entities.get(
parent.entity_key, new_entities.get(parent.entity_key)
)
# If there's no parent_id but there is a parent object, try to fetch
# its stored version
if not parent_id and parent:
batch = list(self._repo.get(session, [parent]).values())
if not existing_parent:
# No existing parent -> we need to flush the one reported by this
# entity
return entity
# If the parent is already stored, use its ID
if batch:
parent = batch[0]
parent_id = parent.id
# Check if the existing parent already has a child with the same key as
# this entity
existing_entity = next(
iter(
child
for child in existing_parent.children
if child.entity_key == entity.entity_key
),
None,
)
# Otherwise, check if its key is already among those awaiting flush
# and reuse the same objects (prevents SQLAlchemy from generating
# duplicate inserts)
else:
temp_entity = new_entities.get(parent.entity_key)
if temp_entity:
self._remove_duplicate_children(entity, temp_entity)
parent = entity.parent = temp_entity
else:
new_entities[parent.entity_key] = parent
# Recursively apply any changes up in the hierarchy
self._update_parent(session, parent, new_entities=new_entities)
# If we found a parent_id, populate it on the entity (and remove the
# supporting relationship object so SQLAlchemy doesn't go nuts when
# flushing)
if parent_id:
if not existing_entity:
# If this entity isn't currently a member of the existing parent,
# temporarily reset the parent of the current entity, so we won't
# carry stale objects around. We will soon rewire it to the
# existing parent.
entity.parent = None
entity.parent_id = parent_id
else:
# Otherwise, merge the columns of the existing entity with those of
# the new entity and use the existing entity
entity = cls._merge_columns(entity, existing_entity)
return parent_id, parent
# Refresh the existing collection of children with the new/updated
# entity
cls._append_children(existing_parent, entity)
# Recursively call this function to synchronize any parent entities up
# in the taxonomy
cls._sync_parent(session, existing_parent, new_entities, existing_entities)
return entity
@staticmethod
def _remove_duplicate_children(entity: Entity, parent: Optional[Entity] = None):
if not parent:
return
# Make sure that an entity has no duplicate entity IDs among its
# children
existing_child_index_by_id = None
if entity.id:
try:
existing_child_index_by_id = [e.id for e in parent.children].index(
entity.id
)
parent.children.pop(existing_child_index_by_id)
except ValueError:
pass
# Make sure that an entity has no duplicate entity keys among its
# children
existing_child_index_by_key = None
def get_parent(session: Session, entity: Entity) -> Optional[Entity]:
"""
Gets the parent of an entity, and it fetches if it's not available in
the current session.
"""
try:
existing_child_index_by_key = [e.entity_key for e in parent.children].index(
entity.entity_key
return entity.parent
except exc.DetachedInstanceError:
# Dirty fix for `Parent instance <...> is not bound to a Session;
# lazy load operation of attribute 'parent' cannot proceed`
return (
session.query(Entity).get(entity.parent_id)
if entity.parent_id
else None
)
parent.children.pop(existing_child_index_by_key)
except ValueError:
pass
@classmethod
def _merge_columns(cls, entity: Entity, existing_entity: Entity) -> Entity:
@staticmethod
def _append_children(entity: Entity, *children: Entity):
"""
Update the list of children of a given entity with the given list of
entities.
Note that, in case of ``entity_key`` conflict (the key of a new entity
already exists in the entity's children), the most recent version will
be used, so any column merge logic needs to happen before this method
is called.
"""
entity.children = list(
{
**{e.entity_key: e for e in entity.children},
**{e.entity_key: e for e in children},
}.values()
)
for child in children:
child.parent = entity
if entity.id:
child.parent_id = entity.id
@staticmethod
def _merge_columns(entity: Entity, existing_entity: Entity) -> Entity:
"""
Merge two versions of an entity column by column.
"""
columns = [col.key for col in entity.columns]
for col in columns:
if col == 'meta':
existing_entity.meta = {
**(existing_entity.meta or {}),
**(entity.meta or {}),
existing_entity.meta = { # type: ignore
**(existing_entity.meta or {}), # type: ignore
**(entity.meta or {}), # type: ignore
}
elif col not in ('id', 'created_at'):
setattr(existing_entity, col, getattr(entity, col))