Entities engine improvements

- Added cache support to prevent duplicate EntityUpdateEvents
- The cache is smartly pre-populated and kept up-to-date, so it's
  possible to trigger events as soon as the entities are published by
  the plugin (not only when the records are flushed to the internal db)
This commit is contained in:
Fabio Manganiello 2022-04-11 00:01:21 +02:00
parent 17615ff028
commit 67ff585f6c
Signed by: blacklight
GPG key ID: D90FBA7F76362774

View file

@ -1,11 +1,11 @@
from logging import getLogger from logging import getLogger
from queue import Queue, Empty from queue import Queue, Empty
from threading import Thread, Event from threading import Thread, Event, RLock
from time import time from time import time
from typing import Iterable, List from typing import Iterable, List, Optional
from sqlalchemy import and_, or_ from sqlalchemy import and_, or_
from sqlalchemy.orm import Session from sqlalchemy.orm import Session, make_transient
from platypush.context import get_bus from platypush.context import get_bus
from platypush.message.event.entities import EntityUpdateEvent from platypush.message.event.entities import EntityUpdateEvent
@ -23,6 +23,98 @@ class EntitiesEngine(Thread):
self.logger = getLogger(name=obj_name) self.logger = getLogger(name=obj_name)
self._queue = Queue() self._queue = Queue()
self._should_stop = Event() self._should_stop = Event()
self._entities_cache_lock = RLock()
self._entities_cache = {
'by_id': {},
'by_external_id_and_plugin': {},
'by_name_and_plugin': {},
}
def _get_db(self):
from platypush.context import get_plugin
db = get_plugin('db')
assert db
return db
def _get_cached_entity(self, entity: Entity) -> Optional[dict]:
if entity.id:
e = self._entities_cache['by_id'].get(entity.id)
if e:
return e
if entity.external_id and entity.plugin:
e = self._entities_cache['by_external_id_and_plugin'].get(
(entity.external_id, entity.plugin)
)
if e:
return e
if entity.name and entity.plugin:
e = self._entities_cache['by_name_and_plugin'].get(
(entity.name, entity.plugin)
)
if e:
return e
@staticmethod
def _cache_repr(entity: Entity) -> dict:
repr_ = entity.to_json()
repr_.pop('data', None)
repr_.pop('meta', None)
repr_.pop('created_at', None)
repr_.pop('updated_at', None)
return repr_
def _cache_entities(self, *entities: Entity, overwrite_cache=False):
for entity in entities:
e = self._cache_repr(entity)
if not overwrite_cache:
existing_entity = self._entities_cache['by_id'].get(entity.id)
if existing_entity:
for k, v in existing_entity.items():
if e.get(k) is None:
e[k] = v
if entity.id:
self._entities_cache['by_id'][entity.id] = e
if entity.external_id and entity.plugin:
self._entities_cache['by_external_id_and_plugin'][
(entity.external_id, entity.plugin)
] = e
if entity.name and entity.plugin:
self._entities_cache['by_name_and_plugin'][
(entity.name, entity.plugin)
] = e
def _entity_has_changes(self, new_entity: Entity) -> bool:
with self._entities_cache_lock:
cached_entity = self._get_cached_entity(new_entity)
if cached_entity:
if cached_entity.get('id'):
new_entity.id = cached_entity['id']
if cached_entity == self._cache_repr(new_entity):
return False
if new_entity.id:
self._cache_entities(new_entity)
return True
def _init_entities_cache(self):
with self._get_db().get_session() as session:
entities = session.query(Entity).all()
for entity in entities:
make_transient(entity)
with self._entities_cache_lock:
self._cache_entities(*entities, overwrite_cache=True)
self.logger.info('Entities cache initialized')
def _process_event(self, entity: Entity):
if self._entity_has_changes(entity):
get_bus().post(EntityUpdateEvent(entity=entity))
def post(self, *entities: Entity): def post(self, *entities: Entity):
for entity in entities: for entity in entities:
@ -38,6 +130,7 @@ class EntitiesEngine(Thread):
def run(self): def run(self):
super().run() super().run()
self.logger.info('Started entities engine') self.logger.info('Started entities engine')
self._init_entities_cache()
while not self.should_stop: while not self.should_stop:
msgs = [] msgs = []
@ -53,6 +146,9 @@ class EntitiesEngine(Thread):
if msg: if msg:
msgs.append(msg) msgs.append(msg)
# Trigger an EntityUpdateEvent if there has
# been a change on the entity state
self._process_event(msg)
if not msgs or self.should_stop: if not msgs or self.should_stop:
continue continue
@ -126,13 +222,12 @@ class EntitiesEngine(Thread):
return new_entities return new_entities
def _process_entities(self, *entities: Entity): def _process_entities(self, *entities: Entity):
from platypush.context import get_plugin with self._get_db().get_session() as session:
with get_plugin('db').get_session() as session: # type: ignore
existing_entities = self._get_if_exist(session, entities) existing_entities = self._get_if_exist(session, entities)
entities = self._merge_entities(entities, existing_entities) # type: ignore entities = self._merge_entities(entities, existing_entities) # type: ignore
session.add_all(entities) session.add_all(entities)
session.commit() session.commit()
with self._entities_cache_lock:
for entity in entities: for entity in entities:
get_bus().post(EntityUpdateEvent(entity=entity)) self._cache_entities(entity, overwrite_cache=True)