diff --git a/platypush/entities/_engine/__init__.py b/platypush/entities/_engine/__init__.py index 45a0840387..7f5df6369e 100644 --- a/platypush/entities/_engine/__init__.py +++ b/platypush/entities/_engine/__init__.py @@ -37,6 +37,11 @@ class EntitiesEngine(Thread): self.logger = getLogger(name=obj_name) self._should_stop = Event() """ Event used to synchronize stop events downstream.""" + self._running = Event() + """ + Event used to synchronize other threads to wait for the engine to + start. + """ self._queue = EntitiesQueue(stop_event=self._should_stop) """ Queue where all entity upsert requests are received.""" self._repo = EntitiesRepository() @@ -51,6 +56,13 @@ class EntitiesEngine(Thread): self._queue.put(*entities) + def wait_start(self, timeout: Optional[float] = None) -> None: + started = self._running.wait(timeout=timeout) + if not started: + raise TimeoutError( + f'Timeout waiting for {self.__class__.__name__} to start.' + ) + @property def should_stop(self) -> bool: return self._should_stop.is_set() @@ -66,30 +78,48 @@ class EntitiesEngine(Thread): """ for entity in entities: get_bus().post(EntityUpdateEvent(entity=entity)) - callback = self._callbacks.pop(entity.entity_key, None) - if callback: + self._process_callback(entity) + + def _process_callback(self, entity: Entity) -> None: + """ + Process the callback for the given entity. + """ + callback = self._callbacks.pop(entity.entity_key, None) + if callback: + try: callback(entity) + except Exception as e: + self.logger.error( + 'Error while notifying updates for entity ID %d via %s: %s', + entity.id, + callback, + e, + ) + self.logger.exception(e) def run(self): super().run() set_thread_name('entities') self.logger.info('Started entities engine') + self._running.set() - while not self.should_stop: - # Get a batch of entity updates forwarded by other integrations - entities = self._queue.get() - if not entities or self.should_stop: - continue + try: + while not self.should_stop: + # Get a batch of entity updates forwarded by other integrations + entities = self._queue.get() + if not entities or self.should_stop: + continue - # Store the batch of entities - try: - entities = self._repo.save(*entities) - except Exception as e: - self.logger.error('Error while processing entity updates: %s', e) - self.logger.exception(e) - continue + # Store the batch of entities + try: + entities = self._repo.save(*entities) + except Exception as e: + self.logger.error('Error while processing entity updates: %s', e) + self.logger.exception(e) + continue - # Trigger EntityUpdateEvent events - self.notify(*entities) - - self.logger.info('Stopped entities engine') + # Trigger EntityUpdateEvent events + self.notify(*entities) + finally: + self.logger.info('Stopped entities engine') + self._running.clear()