platypush/platypush/entities/_engine/repo/merger.py

199 lines
6.7 KiB
Python

from typing import Iterable, List, Optional
from sqlalchemy.orm import Session, exc
from platypush.entities._base import Entity, EntityMapping
# pylint: disable=too-few-public-methods
class EntitiesMerger:
"""
A stateless functor in charge of detecting and merging entities that
already exist on the database before flushing the session.
"""
def __call__(
self,
session: Session,
entities: Iterable[Entity],
existing_entities: Optional[EntityMapping] = None,
) -> List[Entity]:
"""
Merge a set of entities with their existing representations and update
the parent/child relationships and return a list containing
``[*updated_entities, *new_entities]``.
"""
existing_entities = existing_entities or {}
new_entities: EntityMapping = {}
self._merge(
session,
entities,
new_entities=new_entities,
existing_entities=existing_entities,
)
return list({**existing_entities, **new_entities}.values())
def _merge(
self,
session: Session,
entities: Iterable[Entity],
new_entities: EntityMapping,
existing_entities: EntityMapping,
) -> List[Entity]:
"""
(Recursive) inner implementation of the entity merge logic.
"""
processed_entities = []
# Retrieve existing records and merge them
for entity in entities:
key = entity.entity_key
existing_entity = existing_entities.get(key, new_entities.get(key))
# Synchronize the parent(s)
entity = self._sync_parent(session, entity, new_entities, existing_entities)
if existing_entity:
# Merge the columns with those of the existing entity
existing_entity = self._merge_columns(entity, existing_entity)
# Merge the children
self._append_children(
existing_entity,
*self._merge(
session,
entity.children,
new_entities,
existing_entities,
)
)
# Use the existing entity now that it's been merged
entity = existing_entity
else:
# Add it to the map of new entities if the entity doesn't exist on the db
new_entities[key] = entity
processed_entities.append(entity)
return processed_entities
@classmethod
def _sync_parent(
cls,
session: Session,
entity: Entity,
new_entities: EntityMapping,
existing_entities: EntityMapping,
) -> Entity:
"""
Recursively refresh the parent of an entity all the way up in the
hierarchy, to make sure that all the parent/child relations are
appropriately rewired and that all the relevant objects are added to
this session.
"""
parent = cls.get_parent(session, entity)
if not parent:
# No parent -> we can terminate the recursive climbing
return entity
# Check if an entity with the same key as the reported parent already
# exists in the cached entities
existing_parent = existing_entities.get(
parent.entity_key, new_entities.get(parent.entity_key)
)
if not existing_parent:
# No existing parent -> we need to flush the one reported by this
# entity
return entity
# Check if the existing parent already has a child with the same key as
# this entity
existing_entity = next(
iter(
child
for child in existing_parent.children
if child.entity_key == entity.entity_key
),
None,
)
if not existing_entity:
# If this entity isn't currently a member of the existing parent,
# temporarily reset the parent of the current entity, so we won't
# carry stale objects around. We will soon rewire it to the
# existing parent.
entity.parent = None
else:
# Otherwise, merge the columns of the existing entity with those of
# the new entity and use the existing entity
entity = cls._merge_columns(entity, existing_entity)
# Refresh the existing collection of children with the new/updated
# entity
cls._append_children(existing_parent, entity)
# Recursively call this function to synchronize any parent entities up
# in the taxonomy
cls._sync_parent(session, existing_parent, new_entities, existing_entities)
return entity
@staticmethod
def get_parent(session: Session, entity: Entity) -> Optional[Entity]:
"""
Gets the parent of an entity, and it fetches if it's not available in
the current session.
"""
try:
return entity.parent
except exc.DetachedInstanceError:
# Dirty fix for `Parent instance <...> is not bound to a Session;
# lazy load operation of attribute 'parent' cannot proceed`
return (
session.query(Entity).get(entity.parent_id)
if entity.parent_id
else None
)
@staticmethod
def _append_children(entity: Entity, *children: Entity):
"""
Update the list of children of a given entity with the given list of
entities.
Note that, in case of ``entity_key`` conflict (the key of a new entity
already exists in the entity's children), the most recent version will
be used, so any column merge logic needs to happen before this method
is called.
"""
entity.children = list(
{
**{e.entity_key: e for e in entity.children},
**{e.entity_key: e for e in children},
}.values()
)
for child in children:
child.parent = entity
if entity.id:
child.parent_id = entity.id
@staticmethod
def _merge_columns(entity: Entity, existing_entity: Entity) -> Entity:
"""
Merge two versions of an entity column by column.
"""
columns = [col.key for col in entity.columns]
for col in columns:
if col == 'meta':
existing_entity.meta = { # type: ignore
**(existing_entity.meta or {}), # type: ignore
**(entity.meta or {}), # type: ignore
}
elif col not in ('id', 'created_at'):
setattr(existing_entity, col, getattr(entity, col))
return existing_entity