EntitiesEngine synchronization improvements.

- Added `wait_start()` method that other threads can use to synchronize
  with the engine and wait before performing db operations.

- Callback logic wrapped in a try/except block to prevent custom
  integrations with buggy callbacks from crashing the engine.
This commit is contained in:
Fabio Manganiello 2023-03-10 00:57:24 +01:00
parent 73dc2463f1
commit ceb7a2f098
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
1 changed files with 48 additions and 18 deletions

View File

@ -37,6 +37,11 @@ class EntitiesEngine(Thread):
self.logger = getLogger(name=obj_name)
self._should_stop = Event()
""" Event used to synchronize stop events downstream."""
self._running = Event()
"""
Event used to synchronize other threads to wait for the engine to
start.
"""
self._queue = EntitiesQueue(stop_event=self._should_stop)
""" Queue where all entity upsert requests are received."""
self._repo = EntitiesRepository()
@ -51,6 +56,13 @@ class EntitiesEngine(Thread):
self._queue.put(*entities)
def wait_start(self, timeout: Optional[float] = None) -> None:
started = self._running.wait(timeout=timeout)
if not started:
raise TimeoutError(
f'Timeout waiting for {self.__class__.__name__} to start.'
)
@property
def should_stop(self) -> bool:
return self._should_stop.is_set()
@ -66,30 +78,48 @@ class EntitiesEngine(Thread):
"""
for entity in entities:
get_bus().post(EntityUpdateEvent(entity=entity))
callback = self._callbacks.pop(entity.entity_key, None)
if callback:
self._process_callback(entity)
def _process_callback(self, entity: Entity) -> None:
"""
Process the callback for the given entity.
"""
callback = self._callbacks.pop(entity.entity_key, None)
if callback:
try:
callback(entity)
except Exception as e:
self.logger.error(
'Error while notifying updates for entity ID %d via %s: %s',
entity.id,
callback,
e,
)
self.logger.exception(e)
def run(self):
super().run()
set_thread_name('entities')
self.logger.info('Started entities engine')
self._running.set()
while not self.should_stop:
# Get a batch of entity updates forwarded by other integrations
entities = self._queue.get()
if not entities or self.should_stop:
continue
try:
while not self.should_stop:
# Get a batch of entity updates forwarded by other integrations
entities = self._queue.get()
if not entities or self.should_stop:
continue
# Store the batch of entities
try:
entities = self._repo.save(*entities)
except Exception as e:
self.logger.error('Error while processing entity updates: %s', e)
self.logger.exception(e)
continue
# Store the batch of entities
try:
entities = self._repo.save(*entities)
except Exception as e:
self.logger.error('Error while processing entity updates: %s', e)
self.logger.exception(e)
continue
# Trigger EntityUpdateEvent events
self.notify(*entities)
self.logger.info('Stopped entities engine')
# Trigger EntityUpdateEvent events
self.notify(*entities)
finally:
self.logger.info('Stopped entities engine')
self._running.clear()