diff --git a/platypush/entities/_engine/__init__.py b/platypush/entities/_engine/__init__.py index db071186d..19ead9727 100644 --- a/platypush/entities/_engine/__init__.py +++ b/platypush/entities/_engine/__init__.py @@ -4,7 +4,10 @@ from threading import Thread, Event from platypush.entities import Entity from platypush.utils import set_thread_name +# pylint: disable=no-name-in-module from platypush.entities._engine.notifier import EntityNotifier + +# pylint: disable=no-name-in-module from platypush.entities._engine.queue import EntitiesQueue from platypush.entities._engine.repo import EntitiesRepository @@ -18,7 +21,7 @@ class EntitiesEngine(Thread): 1. Consume entities from a queue (synchronized with the upstream integrations that produce/handle them). The producer/consumer model ensure that only this thread writes to the database, packs events - together (preventing eccessive writes and throttling events), and + together (preventing excessive writes and throttling events), and prevents race conditions when SQLite is used. 2. Merge any existing entities with their newer representations. 3. Update the entities taxonomy. @@ -66,7 +69,7 @@ class EntitiesEngine(Thread): try: entities = self._repo.save(*entities) except Exception as e: - self.logger.error('Error while processing entity updates: ' + str(e)) + self.logger.error('Error while processing entity updates: %s', e) self.logger.exception(e) continue diff --git a/platypush/entities/_engine/notifier.py b/platypush/entities/_engine/notifier.py index cc45fece2..92c32ec96 100644 --- a/platypush/entities/_engine/notifier.py +++ b/platypush/entities/_engine/notifier.py @@ -1,7 +1,9 @@ +from typing import Set, Tuple from platypush.context import get_bus from platypush.entities import Entity from platypush.message.event.entities import EntityUpdateEvent +# pylint: disable=no-name-in-module from platypush.entities._engine.repo.cache import EntitiesCache @@ -13,7 +15,7 @@ class EntityNotifier: def __init__(self, cache: EntitiesCache): self._cache = cache - self._entities_awaiting_flush = set() + self._entities_awaiting_flush: Set[Tuple[str, str]] = set() def _populate_entity_id_from_cache(self, new_entity: Entity): cached_entity = self._cache.get(new_entity) diff --git a/platypush/entities/_engine/repo/__init__.py b/platypush/entities/_engine/repo/__init__.py index ed355f341..73be43f70 100644 --- a/platypush/entities/_engine/repo/__init__.py +++ b/platypush/entities/_engine/repo/__init__.py @@ -4,7 +4,11 @@ from typing import Dict, Iterable, Tuple from sqlalchemy.orm import Session, make_transient from platypush.entities import Entity + +# pylint: disable=no-name-in-module from platypush.entities._engine.repo.cache import EntitiesCache + +# pylint: disable=no-name-in-module from platypush.entities._engine.repo.db import EntitiesDb from platypush.entities._engine.repo.merger import EntitiesMerger diff --git a/platypush/entities/_engine/repo/cache.py b/platypush/entities/_engine/repo/cache.py index 201714103..c38131a37 100644 --- a/platypush/entities/_engine/repo/cache.py +++ b/platypush/entities/_engine/repo/cache.py @@ -30,6 +30,8 @@ class EntitiesCache: if e: return e + return None + def update(self, *entities: Entity, overwrite=False): """ Update the cache with a list of new entities. diff --git a/platypush/entities/_engine/repo/db.py b/platypush/entities/_engine/repo/db.py index 1114da34b..7dbc6a076 100644 --- a/platypush/entities/_engine/repo/db.py +++ b/platypush/entities/_engine/repo/db.py @@ -82,9 +82,15 @@ class EntitiesDb: rewired. Otherwise, we may end up with conflicts on entities that have already been flushed. """ - # Index childrens by parent_id and by parent_key - children_by_parent_id = defaultdict(lambda: defaultdict(Entity)) - children_by_parent_key = defaultdict(lambda: defaultdict(Entity)) + # Index children by parent_id and by parent_key + children_by_parent_id: Dict[int, Dict[Tuple[str, str], Entity]] = defaultdict( + lambda: defaultdict(Entity) + ) + + children_by_parent_key: Dict[ + Tuple[str, str], Dict[Tuple[str, str], Entity] + ] = defaultdict(lambda: defaultdict(Entity)) + for entity in entities: parent_key = None parent_id = entity.parent_id @@ -113,8 +119,8 @@ class EntitiesDb: _TaxonomyAwareEntity(entity=e, level=0) for e in root_entities ] - batches = [] - current_batch = [] + batches: List[List[Entity]] = [] + current_batch: List[_TaxonomyAwareEntity] = [] while entities_to_process: # Pop the first element in the list (FIFO implementation) diff --git a/platypush/entities/_engine/repo/merger.py b/platypush/entities/_engine/repo/merger.py index b3b88dd45..a6bc3ea5d 100644 --- a/platypush/entities/_engine/repo/merger.py +++ b/platypush/entities/_engine/repo/merger.py @@ -12,9 +12,9 @@ class EntitiesMerger: """ def __init__(self, repository): - from platypush.entities._engine.repo import EntitiesRepository + from . import EntitiesRepository - self._repo: EntitiesRepository = repository + self._repo: EntitiesRepository = repository # type: ignore def merge( self, @@ -26,8 +26,8 @@ class EntitiesMerger: the parent/child relationships and return a tuple with ``[new_entities, updated_entities]``. """ - new_entities = {} - existing_entities = {} + new_entities: Dict[Tuple[str, str], Entity] = {} + existing_entities: Dict[Tuple[str, str], Entity] = {} self._merge( session,