diff --git a/platypush/entities/_engine/__init__.py b/platypush/entities/_engine/__init__.py index 19ead972..505d7a70 100644 --- a/platypush/entities/_engine/__init__.py +++ b/platypush/entities/_engine/__init__.py @@ -1,12 +1,11 @@ from logging import getLogger from threading import Thread, Event +from platypush.context import get_bus from platypush.entities import Entity +from platypush.message.event.entities import EntityUpdateEvent from platypush.utils import set_thread_name -# pylint: disable=no-name-in-module -from platypush.entities._engine.notifier import EntityNotifier - # pylint: disable=no-name-in-module from platypush.entities._engine.queue import EntitiesQueue from platypush.entities._engine.repo import EntitiesRepository @@ -38,7 +37,6 @@ class EntitiesEngine(Thread): self._should_stop = Event() self._queue = EntitiesQueue(stop_event=self._should_stop) self._repo = EntitiesRepository() - self._notifier = EntityNotifier(self._repo._cache) def post(self, *entities: Entity): self._queue.put(*entities) @@ -50,6 +48,15 @@ class EntitiesEngine(Thread): def stop(self): self._should_stop.set() + def notify(self, *entities: Entity): + """ + Trigger an EntityUpdateEvent if the entity has been persisted, or queue + it to the list of entities whose notifications will be flushed when the + session is committed. + """ + for entity in entities: + get_bus().post(EntityUpdateEvent(entity=entity)) + def run(self): super().run() set_thread_name('entities') @@ -61,10 +68,6 @@ class EntitiesEngine(Thread): if not entities or self.should_stop: continue - # Trigger/prepare EntityUpdateEvent objects - for entity in entities: - self._notifier.notify(entity) - # Store the batch of entities try: entities = self._repo.save(*entities) @@ -73,7 +76,7 @@ class EntitiesEngine(Thread): self.logger.exception(e) continue - # Flush any pending notifications - self._notifier.flush(*entities) + # Trigger EntityUpdateEvent events + self.notify(*entities) self.logger.info('Stopped entities engine') diff --git a/platypush/entities/_engine/notifier.py b/platypush/entities/_engine/notifier.py deleted file mode 100644 index 92c32ec9..00000000 --- a/platypush/entities/_engine/notifier.py +++ /dev/null @@ -1,49 +0,0 @@ -from typing import Set, Tuple -from platypush.context import get_bus -from platypush.entities import Entity -from platypush.message.event.entities import EntityUpdateEvent - -# pylint: disable=no-name-in-module -from platypush.entities._engine.repo.cache import EntitiesCache - - -class EntityNotifier: - """ - This object is in charge of forwarding EntityUpdateEvent instances on the - application bus when some entities are changed. - """ - - def __init__(self, cache: EntitiesCache): - self._cache = cache - self._entities_awaiting_flush: Set[Tuple[str, str]] = set() - - def _populate_entity_id_from_cache(self, new_entity: Entity): - cached_entity = self._cache.get(new_entity) - if cached_entity and cached_entity.id: - new_entity.id = cached_entity.id - if new_entity.id: - self._cache.update(new_entity) - - def notify(self, entity: Entity): - """ - Trigger an EntityUpdateEvent if the entity has been persisted, or queue - it to the list of entities whose notifications will be flushed when the - session is committed. - """ - self._populate_entity_id_from_cache(entity) - if entity.id: - get_bus().post(EntityUpdateEvent(entity=entity)) - else: - self._entities_awaiting_flush.add(entity.entity_key) - - def flush(self, *entities: Entity): - """ - Flush and process any entities with pending EntityUpdateEvent - notifications. - """ - entities_awaiting_flush = {*self._entities_awaiting_flush} - for entity in entities: - key = entity.entity_key - if key in entities_awaiting_flush: - self.notify(entity) - self._entities_awaiting_flush.remove(key) diff --git a/platypush/entities/_engine/repo/__init__.py b/platypush/entities/_engine/repo/__init__.py index 73be43f7..1bf38131 100644 --- a/platypush/entities/_engine/repo/__init__.py +++ b/platypush/entities/_engine/repo/__init__.py @@ -1,13 +1,10 @@ import logging from typing import Dict, Iterable, Tuple -from sqlalchemy.orm import Session, make_transient +from sqlalchemy.orm import Session from platypush.entities import Entity -# pylint: disable=no-name-in-module -from platypush.entities._engine.repo.cache import EntitiesCache - # pylint: disable=no-name-in-module from platypush.entities._engine.repo.db import EntitiesDb from platypush.entities._engine.repo.merger import EntitiesMerger @@ -17,74 +14,31 @@ logger = logging.getLogger('entities') class EntitiesRepository: """ - This object is used to get and save entities, and it wraps database and - cache objects. + This object is used to get and save entities. It wraps the database + connection. """ def __init__(self): - self._cache = EntitiesCache() self._db = EntitiesDb() self._merger = EntitiesMerger(self) - self._init_entities_cache() - - def _init_entities_cache(self): - """ - Initializes the repository with the existing entities. - """ - logger.info('Initializing entities cache') - with self._db.get_session() as session: - entities = session.query(Entity).all() - for entity in entities: - make_transient(entity) - - self._cache.update(*entities, overwrite=True) - logger.info('Entities cache initialized') def get( - self, session: Session, entities: Iterable[Entity], use_cache=True + self, session: Session, entities: Iterable[Entity] ) -> Dict[Tuple[str, str], Entity]: """ Given a set of entity objects, it returns those that already exist - (or have the same ``entity_key``). It looks up both the cache and the - database. + (or have the same ``entity_key``). """ - existing_entities = {} - if not use_cache: - existing_entities = self._db.fetch(session, entities) - self._cache.update(*existing_entities.values()) - else: - # Fetch the entities that exist in the cache - existing_entities = { - e.entity_key: self._cache.get(e) for e in entities if self._cache.get(e) - } - - # Retrieve from the database the entities that miss from the cache - cache_miss_entities = { - e.entity_key: e - for e in entities - if e.entity_key not in existing_entities - } - - cache_miss_existing_entities = self._db.fetch( - session, cache_miss_entities.values() - ) - - # Update the cache - self._cache.update(*cache_miss_existing_entities.values()) - - # Return the union of the cached + retrieved entities - existing_entities.update(cache_miss_existing_entities) - - return existing_entities + return self._db.fetch(session, entities) def save(self, *entities: Entity) -> Iterable[Entity]: """ Perform an upsert of entities after merging duplicates and rebuilding - the taxonomies. It updates both the database and the cache. + the taxonomies. """ + with self._db.get_session(locked=True, autoflush=False) as session: merged_entities = self._merger.merge(session, entities) merged_entities = self._db.upsert(session, merged_entities) - self._cache.update(*merged_entities, overwrite=True) return merged_entities diff --git a/platypush/entities/_engine/repo/cache.py b/platypush/entities/_engine/repo/cache.py deleted file mode 100644 index c38131a3..00000000 --- a/platypush/entities/_engine/repo/cache.py +++ /dev/null @@ -1,53 +0,0 @@ -from threading import RLock -from typing import Dict, Optional, Tuple - -from platypush.entities import Entity - - -class EntitiesCache: - """ - An auxiliary class to model an entities lookup cache with multiple keys. - """ - - def __init__(self): - self.by_id: Dict[str, Entity] = {} - self.by_external_id_and_plugin: Dict[Tuple[str, str], Entity] = {} - self._lock = RLock() - - def get(self, entity: Entity) -> Optional[Entity]: - """ - Retrieve the cached representation of an entity, if it exists. - """ - if entity.id: - e = self.by_id.get(str(entity.id)) - if e: - return e - - if entity.external_id and entity.plugin: - e = self.by_external_id_and_plugin.get( - (str(entity.external_id), str(entity.plugin)) - ) - if e: - return e - - return None - - def update(self, *entities: Entity, overwrite=False): - """ - Update the cache with a list of new entities. - """ - with self._lock: - for entity in entities: - if not overwrite: - existing_entity = self.by_id.get(str(entity.id)) - if existing_entity: - for k, v in existing_entity.to_json().items(): - if getattr(entity, k, None) is None: - setattr(entity, k, v) - - if entity.id: - self.by_id[str(entity.id)] = entity - if entity.external_id and entity.plugin: - self.by_external_id_and_plugin[ - (str(entity.external_id), str(entity.plugin)) - ] = entity diff --git a/platypush/entities/_engine/repo/merger.py b/platypush/entities/_engine/repo/merger.py index 601f07d6..90b98714 100644 --- a/platypush/entities/_engine/repo/merger.py +++ b/platypush/entities/_engine/repo/merger.py @@ -49,7 +49,7 @@ class EntitiesMerger: (Recursive) inner implementation of the entity merge logic. """ processed_entities = [] - existing_entities.update(self._repo.get(session, entities, use_cache=False)) + existing_entities.update(self._repo.get(session, entities)) # Make sure that we have no duplicate entity keys in the current batch entities = list( @@ -79,6 +79,9 @@ class EntitiesMerger: # Merge the other columns self._merge_columns(entity, existing_entity) + # Merge the children + self._merge(session, entity.children, new_entities, existing_entities) + # Use the updated version of the existing entity. entity = existing_entity else: # Add it to the map of new entities if the entity doesn't exist @@ -109,7 +112,7 @@ class EntitiesMerger: # If there's no parent_id but there is a parent object, try to fetch # its stored version if not parent_id and parent: - batch = list(self._repo.get(session, [parent], use_cache=False).values()) + batch = list(self._repo.get(session, [parent]).values()) # If the parent is already stored, use its ID if batch: @@ -135,7 +138,7 @@ class EntitiesMerger: # flushing) if parent_id: entity.parent = None - entity.parent_id = parent_id # type: ignore + entity.parent_id = parent_id return parent_id, parent