Major improvements on the entities engine.

- Better logic to recursively link parent/children entities, so partial
  updates won't get lost.

- Removed `EntitiesCache` - it was too much to maintain while keeping
  consistent with the ORM, and it was a perennial fight against
  SQLAlchemy's own cache.

- Removed `EntityNotifier` - with no need to merge cached entities, the
  `notify` method has become much simpler and it's simply been merged
  in the `EntitiesRepository`.
This commit is contained in:
Fabio Manganiello 2023-02-22 02:53:45 +01:00
parent 9776921836
commit d8c429f4a8
Signed by: blacklight
GPG key ID: D90FBA7F76362774
5 changed files with 27 additions and 169 deletions

View file

@ -1,12 +1,11 @@
from logging import getLogger from logging import getLogger
from threading import Thread, Event from threading import Thread, Event
from platypush.context import get_bus
from platypush.entities import Entity from platypush.entities import Entity
from platypush.message.event.entities import EntityUpdateEvent
from platypush.utils import set_thread_name from platypush.utils import set_thread_name
# pylint: disable=no-name-in-module
from platypush.entities._engine.notifier import EntityNotifier
# pylint: disable=no-name-in-module # pylint: disable=no-name-in-module
from platypush.entities._engine.queue import EntitiesQueue from platypush.entities._engine.queue import EntitiesQueue
from platypush.entities._engine.repo import EntitiesRepository from platypush.entities._engine.repo import EntitiesRepository
@ -38,7 +37,6 @@ class EntitiesEngine(Thread):
self._should_stop = Event() self._should_stop = Event()
self._queue = EntitiesQueue(stop_event=self._should_stop) self._queue = EntitiesQueue(stop_event=self._should_stop)
self._repo = EntitiesRepository() self._repo = EntitiesRepository()
self._notifier = EntityNotifier(self._repo._cache)
def post(self, *entities: Entity): def post(self, *entities: Entity):
self._queue.put(*entities) self._queue.put(*entities)
@ -50,6 +48,15 @@ class EntitiesEngine(Thread):
def stop(self): def stop(self):
self._should_stop.set() self._should_stop.set()
def notify(self, *entities: Entity):
"""
Trigger an EntityUpdateEvent if the entity has been persisted, or queue
it to the list of entities whose notifications will be flushed when the
session is committed.
"""
for entity in entities:
get_bus().post(EntityUpdateEvent(entity=entity))
def run(self): def run(self):
super().run() super().run()
set_thread_name('entities') set_thread_name('entities')
@ -61,10 +68,6 @@ class EntitiesEngine(Thread):
if not entities or self.should_stop: if not entities or self.should_stop:
continue continue
# Trigger/prepare EntityUpdateEvent objects
for entity in entities:
self._notifier.notify(entity)
# Store the batch of entities # Store the batch of entities
try: try:
entities = self._repo.save(*entities) entities = self._repo.save(*entities)
@ -73,7 +76,7 @@ class EntitiesEngine(Thread):
self.logger.exception(e) self.logger.exception(e)
continue continue
# Flush any pending notifications # Trigger EntityUpdateEvent events
self._notifier.flush(*entities) self.notify(*entities)
self.logger.info('Stopped entities engine') self.logger.info('Stopped entities engine')

View file

@ -1,49 +0,0 @@
from typing import Set, Tuple
from platypush.context import get_bus
from platypush.entities import Entity
from platypush.message.event.entities import EntityUpdateEvent
# pylint: disable=no-name-in-module
from platypush.entities._engine.repo.cache import EntitiesCache
class EntityNotifier:
"""
This object is in charge of forwarding EntityUpdateEvent instances on the
application bus when some entities are changed.
"""
def __init__(self, cache: EntitiesCache):
self._cache = cache
self._entities_awaiting_flush: Set[Tuple[str, str]] = set()
def _populate_entity_id_from_cache(self, new_entity: Entity):
cached_entity = self._cache.get(new_entity)
if cached_entity and cached_entity.id:
new_entity.id = cached_entity.id
if new_entity.id:
self._cache.update(new_entity)
def notify(self, entity: Entity):
"""
Trigger an EntityUpdateEvent if the entity has been persisted, or queue
it to the list of entities whose notifications will be flushed when the
session is committed.
"""
self._populate_entity_id_from_cache(entity)
if entity.id:
get_bus().post(EntityUpdateEvent(entity=entity))
else:
self._entities_awaiting_flush.add(entity.entity_key)
def flush(self, *entities: Entity):
"""
Flush and process any entities with pending EntityUpdateEvent
notifications.
"""
entities_awaiting_flush = {*self._entities_awaiting_flush}
for entity in entities:
key = entity.entity_key
if key in entities_awaiting_flush:
self.notify(entity)
self._entities_awaiting_flush.remove(key)

View file

@ -1,13 +1,10 @@
import logging import logging
from typing import Dict, Iterable, Tuple from typing import Dict, Iterable, Tuple
from sqlalchemy.orm import Session, make_transient from sqlalchemy.orm import Session
from platypush.entities import Entity from platypush.entities import Entity
# pylint: disable=no-name-in-module
from platypush.entities._engine.repo.cache import EntitiesCache
# pylint: disable=no-name-in-module # pylint: disable=no-name-in-module
from platypush.entities._engine.repo.db import EntitiesDb from platypush.entities._engine.repo.db import EntitiesDb
from platypush.entities._engine.repo.merger import EntitiesMerger from platypush.entities._engine.repo.merger import EntitiesMerger
@ -17,74 +14,31 @@ logger = logging.getLogger('entities')
class EntitiesRepository: class EntitiesRepository:
""" """
This object is used to get and save entities, and it wraps database and This object is used to get and save entities. It wraps the database
cache objects. connection.
""" """
def __init__(self): def __init__(self):
self._cache = EntitiesCache()
self._db = EntitiesDb() self._db = EntitiesDb()
self._merger = EntitiesMerger(self) self._merger = EntitiesMerger(self)
self._init_entities_cache()
def _init_entities_cache(self):
"""
Initializes the repository with the existing entities.
"""
logger.info('Initializing entities cache')
with self._db.get_session() as session:
entities = session.query(Entity).all()
for entity in entities:
make_transient(entity)
self._cache.update(*entities, overwrite=True)
logger.info('Entities cache initialized')
def get( def get(
self, session: Session, entities: Iterable[Entity], use_cache=True self, session: Session, entities: Iterable[Entity]
) -> Dict[Tuple[str, str], Entity]: ) -> Dict[Tuple[str, str], Entity]:
""" """
Given a set of entity objects, it returns those that already exist Given a set of entity objects, it returns those that already exist
(or have the same ``entity_key``). It looks up both the cache and the (or have the same ``entity_key``).
database.
""" """
existing_entities = {} return self._db.fetch(session, entities)
if not use_cache:
existing_entities = self._db.fetch(session, entities)
self._cache.update(*existing_entities.values())
else:
# Fetch the entities that exist in the cache
existing_entities = {
e.entity_key: self._cache.get(e) for e in entities if self._cache.get(e)
}
# Retrieve from the database the entities that miss from the cache
cache_miss_entities = {
e.entity_key: e
for e in entities
if e.entity_key not in existing_entities
}
cache_miss_existing_entities = self._db.fetch(
session, cache_miss_entities.values()
)
# Update the cache
self._cache.update(*cache_miss_existing_entities.values())
# Return the union of the cached + retrieved entities
existing_entities.update(cache_miss_existing_entities)
return existing_entities
def save(self, *entities: Entity) -> Iterable[Entity]: def save(self, *entities: Entity) -> Iterable[Entity]:
""" """
Perform an upsert of entities after merging duplicates and rebuilding Perform an upsert of entities after merging duplicates and rebuilding
the taxonomies. It updates both the database and the cache. the taxonomies.
""" """
with self._db.get_session(locked=True, autoflush=False) as session: with self._db.get_session(locked=True, autoflush=False) as session:
merged_entities = self._merger.merge(session, entities) merged_entities = self._merger.merge(session, entities)
merged_entities = self._db.upsert(session, merged_entities) merged_entities = self._db.upsert(session, merged_entities)
self._cache.update(*merged_entities, overwrite=True)
return merged_entities return merged_entities

View file

@ -1,53 +0,0 @@
from threading import RLock
from typing import Dict, Optional, Tuple
from platypush.entities import Entity
class EntitiesCache:
"""
An auxiliary class to model an entities lookup cache with multiple keys.
"""
def __init__(self):
self.by_id: Dict[str, Entity] = {}
self.by_external_id_and_plugin: Dict[Tuple[str, str], Entity] = {}
self._lock = RLock()
def get(self, entity: Entity) -> Optional[Entity]:
"""
Retrieve the cached representation of an entity, if it exists.
"""
if entity.id:
e = self.by_id.get(str(entity.id))
if e:
return e
if entity.external_id and entity.plugin:
e = self.by_external_id_and_plugin.get(
(str(entity.external_id), str(entity.plugin))
)
if e:
return e
return None
def update(self, *entities: Entity, overwrite=False):
"""
Update the cache with a list of new entities.
"""
with self._lock:
for entity in entities:
if not overwrite:
existing_entity = self.by_id.get(str(entity.id))
if existing_entity:
for k, v in existing_entity.to_json().items():
if getattr(entity, k, None) is None:
setattr(entity, k, v)
if entity.id:
self.by_id[str(entity.id)] = entity
if entity.external_id and entity.plugin:
self.by_external_id_and_plugin[
(str(entity.external_id), str(entity.plugin))
] = entity

View file

@ -49,7 +49,7 @@ class EntitiesMerger:
(Recursive) inner implementation of the entity merge logic. (Recursive) inner implementation of the entity merge logic.
""" """
processed_entities = [] processed_entities = []
existing_entities.update(self._repo.get(session, entities, use_cache=False)) existing_entities.update(self._repo.get(session, entities))
# Make sure that we have no duplicate entity keys in the current batch # Make sure that we have no duplicate entity keys in the current batch
entities = list( entities = list(
@ -79,6 +79,9 @@ class EntitiesMerger:
# Merge the other columns # Merge the other columns
self._merge_columns(entity, existing_entity) self._merge_columns(entity, existing_entity)
# Merge the children
self._merge(session, entity.children, new_entities, existing_entities)
# Use the updated version of the existing entity.
entity = existing_entity entity = existing_entity
else: else:
# Add it to the map of new entities if the entity doesn't exist # Add it to the map of new entities if the entity doesn't exist
@ -109,7 +112,7 @@ class EntitiesMerger:
# If there's no parent_id but there is a parent object, try to fetch # If there's no parent_id but there is a parent object, try to fetch
# its stored version # its stored version
if not parent_id and parent: if not parent_id and parent:
batch = list(self._repo.get(session, [parent], use_cache=False).values()) batch = list(self._repo.get(session, [parent]).values())
# If the parent is already stored, use its ID # If the parent is already stored, use its ID
if batch: if batch:
@ -135,7 +138,7 @@ class EntitiesMerger:
# flushing) # flushing)
if parent_id: if parent_id:
entity.parent = None entity.parent = None
entity.parent_id = parent_id # type: ignore entity.parent_id = parent_id
return parent_id, parent return parent_id, parent