From 67ff585f6c296c692afb8a1a6c1a7367ac36443d Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 11 Apr 2022 00:01:21 +0200 Subject: [PATCH] Entities engine improvements - Added cache support to prevent duplicate EntityUpdateEvents - The cache is smartly pre-populated and kept up-to-date, so it's possible to trigger events as soon as the entities are published by the plugin (not only when the records are flushed to the internal db) --- platypush/entities/_engine.py | 111 +++++++++++++++++++++++++++++++--- 1 file changed, 103 insertions(+), 8 deletions(-) diff --git a/platypush/entities/_engine.py b/platypush/entities/_engine.py index 63d9b95be..bbb0bb6c9 100644 --- a/platypush/entities/_engine.py +++ b/platypush/entities/_engine.py @@ -1,11 +1,11 @@ from logging import getLogger from queue import Queue, Empty -from threading import Thread, Event +from threading import Thread, Event, RLock from time import time -from typing import Iterable, List +from typing import Iterable, List, Optional from sqlalchemy import and_, or_ -from sqlalchemy.orm import Session +from sqlalchemy.orm import Session, make_transient from platypush.context import get_bus from platypush.message.event.entities import EntityUpdateEvent @@ -23,6 +23,98 @@ class EntitiesEngine(Thread): self.logger = getLogger(name=obj_name) self._queue = Queue() self._should_stop = Event() + self._entities_cache_lock = RLock() + self._entities_cache = { + 'by_id': {}, + 'by_external_id_and_plugin': {}, + 'by_name_and_plugin': {}, + } + + def _get_db(self): + from platypush.context import get_plugin + + db = get_plugin('db') + assert db + return db + + def _get_cached_entity(self, entity: Entity) -> Optional[dict]: + if entity.id: + e = self._entities_cache['by_id'].get(entity.id) + if e: + return e + + if entity.external_id and entity.plugin: + e = self._entities_cache['by_external_id_and_plugin'].get( + (entity.external_id, entity.plugin) + ) + if e: + return e + + if entity.name and entity.plugin: + e = self._entities_cache['by_name_and_plugin'].get( + (entity.name, entity.plugin) + ) + if e: + return e + + @staticmethod + def _cache_repr(entity: Entity) -> dict: + repr_ = entity.to_json() + repr_.pop('data', None) + repr_.pop('meta', None) + repr_.pop('created_at', None) + repr_.pop('updated_at', None) + return repr_ + + def _cache_entities(self, *entities: Entity, overwrite_cache=False): + for entity in entities: + e = self._cache_repr(entity) + if not overwrite_cache: + existing_entity = self._entities_cache['by_id'].get(entity.id) + if existing_entity: + for k, v in existing_entity.items(): + if e.get(k) is None: + e[k] = v + + if entity.id: + self._entities_cache['by_id'][entity.id] = e + if entity.external_id and entity.plugin: + self._entities_cache['by_external_id_and_plugin'][ + (entity.external_id, entity.plugin) + ] = e + if entity.name and entity.plugin: + self._entities_cache['by_name_and_plugin'][ + (entity.name, entity.plugin) + ] = e + + def _entity_has_changes(self, new_entity: Entity) -> bool: + with self._entities_cache_lock: + cached_entity = self._get_cached_entity(new_entity) + if cached_entity: + if cached_entity.get('id'): + new_entity.id = cached_entity['id'] + if cached_entity == self._cache_repr(new_entity): + return False + + if new_entity.id: + self._cache_entities(new_entity) + + return True + + def _init_entities_cache(self): + with self._get_db().get_session() as session: + entities = session.query(Entity).all() + for entity in entities: + make_transient(entity) + + with self._entities_cache_lock: + self._cache_entities(*entities, overwrite_cache=True) + + self.logger.info('Entities cache initialized') + + def _process_event(self, entity: Entity): + if self._entity_has_changes(entity): + get_bus().post(EntityUpdateEvent(entity=entity)) def post(self, *entities: Entity): for entity in entities: @@ -38,6 +130,7 @@ class EntitiesEngine(Thread): def run(self): super().run() self.logger.info('Started entities engine') + self._init_entities_cache() while not self.should_stop: msgs = [] @@ -53,6 +146,9 @@ class EntitiesEngine(Thread): if msg: msgs.append(msg) + # Trigger an EntityUpdateEvent if there has + # been a change on the entity state + self._process_event(msg) if not msgs or self.should_stop: continue @@ -126,13 +222,12 @@ class EntitiesEngine(Thread): return new_entities def _process_entities(self, *entities: Entity): - from platypush.context import get_plugin - - with get_plugin('db').get_session() as session: # type: ignore + with self._get_db().get_session() as session: existing_entities = self._get_if_exist(session, entities) entities = self._merge_entities(entities, existing_entities) # type: ignore session.add_all(entities) session.commit() - for entity in entities: - get_bus().post(EntityUpdateEvent(entity=entity)) + with self._entities_cache_lock: + for entity in entities: + self._cache_entities(entity, overwrite_cache=True)