forked from platypush/platypush
LINT/type fixes
This commit is contained in:
parent
8aedc3c233
commit
72a9a9dfcf
6 changed files with 29 additions and 12 deletions
|
@ -4,7 +4,10 @@ from threading import Thread, Event
|
||||||
from platypush.entities import Entity
|
from platypush.entities import Entity
|
||||||
from platypush.utils import set_thread_name
|
from platypush.utils import set_thread_name
|
||||||
|
|
||||||
|
# pylint: disable=no-name-in-module
|
||||||
from platypush.entities._engine.notifier import EntityNotifier
|
from platypush.entities._engine.notifier import EntityNotifier
|
||||||
|
|
||||||
|
# pylint: disable=no-name-in-module
|
||||||
from platypush.entities._engine.queue import EntitiesQueue
|
from platypush.entities._engine.queue import EntitiesQueue
|
||||||
from platypush.entities._engine.repo import EntitiesRepository
|
from platypush.entities._engine.repo import EntitiesRepository
|
||||||
|
|
||||||
|
@ -18,7 +21,7 @@ class EntitiesEngine(Thread):
|
||||||
1. Consume entities from a queue (synchronized with the upstream
|
1. Consume entities from a queue (synchronized with the upstream
|
||||||
integrations that produce/handle them). The producer/consumer model
|
integrations that produce/handle them). The producer/consumer model
|
||||||
ensure that only this thread writes to the database, packs events
|
ensure that only this thread writes to the database, packs events
|
||||||
together (preventing eccessive writes and throttling events), and
|
together (preventing excessive writes and throttling events), and
|
||||||
prevents race conditions when SQLite is used.
|
prevents race conditions when SQLite is used.
|
||||||
2. Merge any existing entities with their newer representations.
|
2. Merge any existing entities with their newer representations.
|
||||||
3. Update the entities taxonomy.
|
3. Update the entities taxonomy.
|
||||||
|
@ -66,7 +69,7 @@ class EntitiesEngine(Thread):
|
||||||
try:
|
try:
|
||||||
entities = self._repo.save(*entities)
|
entities = self._repo.save(*entities)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error('Error while processing entity updates: ' + str(e))
|
self.logger.error('Error while processing entity updates: %s', e)
|
||||||
self.logger.exception(e)
|
self.logger.exception(e)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
|
from typing import Set, Tuple
|
||||||
from platypush.context import get_bus
|
from platypush.context import get_bus
|
||||||
from platypush.entities import Entity
|
from platypush.entities import Entity
|
||||||
from platypush.message.event.entities import EntityUpdateEvent
|
from platypush.message.event.entities import EntityUpdateEvent
|
||||||
|
|
||||||
|
# pylint: disable=no-name-in-module
|
||||||
from platypush.entities._engine.repo.cache import EntitiesCache
|
from platypush.entities._engine.repo.cache import EntitiesCache
|
||||||
|
|
||||||
|
|
||||||
|
@ -13,7 +15,7 @@ class EntityNotifier:
|
||||||
|
|
||||||
def __init__(self, cache: EntitiesCache):
|
def __init__(self, cache: EntitiesCache):
|
||||||
self._cache = cache
|
self._cache = cache
|
||||||
self._entities_awaiting_flush = set()
|
self._entities_awaiting_flush: Set[Tuple[str, str]] = set()
|
||||||
|
|
||||||
def _populate_entity_id_from_cache(self, new_entity: Entity):
|
def _populate_entity_id_from_cache(self, new_entity: Entity):
|
||||||
cached_entity = self._cache.get(new_entity)
|
cached_entity = self._cache.get(new_entity)
|
||||||
|
|
|
@ -4,7 +4,11 @@ from typing import Dict, Iterable, Tuple
|
||||||
from sqlalchemy.orm import Session, make_transient
|
from sqlalchemy.orm import Session, make_transient
|
||||||
|
|
||||||
from platypush.entities import Entity
|
from platypush.entities import Entity
|
||||||
|
|
||||||
|
# pylint: disable=no-name-in-module
|
||||||
from platypush.entities._engine.repo.cache import EntitiesCache
|
from platypush.entities._engine.repo.cache import EntitiesCache
|
||||||
|
|
||||||
|
# pylint: disable=no-name-in-module
|
||||||
from platypush.entities._engine.repo.db import EntitiesDb
|
from platypush.entities._engine.repo.db import EntitiesDb
|
||||||
from platypush.entities._engine.repo.merger import EntitiesMerger
|
from platypush.entities._engine.repo.merger import EntitiesMerger
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,8 @@ class EntitiesCache:
|
||||||
if e:
|
if e:
|
||||||
return e
|
return e
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
def update(self, *entities: Entity, overwrite=False):
|
def update(self, *entities: Entity, overwrite=False):
|
||||||
"""
|
"""
|
||||||
Update the cache with a list of new entities.
|
Update the cache with a list of new entities.
|
||||||
|
|
|
@ -82,9 +82,15 @@ class EntitiesDb:
|
||||||
rewired. Otherwise, we may end up with conflicts on entities that have
|
rewired. Otherwise, we may end up with conflicts on entities that have
|
||||||
already been flushed.
|
already been flushed.
|
||||||
"""
|
"""
|
||||||
# Index childrens by parent_id and by parent_key
|
# Index children by parent_id and by parent_key
|
||||||
children_by_parent_id = defaultdict(lambda: defaultdict(Entity))
|
children_by_parent_id: Dict[int, Dict[Tuple[str, str], Entity]] = defaultdict(
|
||||||
children_by_parent_key = defaultdict(lambda: defaultdict(Entity))
|
lambda: defaultdict(Entity)
|
||||||
|
)
|
||||||
|
|
||||||
|
children_by_parent_key: Dict[
|
||||||
|
Tuple[str, str], Dict[Tuple[str, str], Entity]
|
||||||
|
] = defaultdict(lambda: defaultdict(Entity))
|
||||||
|
|
||||||
for entity in entities:
|
for entity in entities:
|
||||||
parent_key = None
|
parent_key = None
|
||||||
parent_id = entity.parent_id
|
parent_id = entity.parent_id
|
||||||
|
@ -113,8 +119,8 @@ class EntitiesDb:
|
||||||
_TaxonomyAwareEntity(entity=e, level=0) for e in root_entities
|
_TaxonomyAwareEntity(entity=e, level=0) for e in root_entities
|
||||||
]
|
]
|
||||||
|
|
||||||
batches = []
|
batches: List[List[Entity]] = []
|
||||||
current_batch = []
|
current_batch: List[_TaxonomyAwareEntity] = []
|
||||||
|
|
||||||
while entities_to_process:
|
while entities_to_process:
|
||||||
# Pop the first element in the list (FIFO implementation)
|
# Pop the first element in the list (FIFO implementation)
|
||||||
|
|
|
@ -12,9 +12,9 @@ class EntitiesMerger:
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, repository):
|
def __init__(self, repository):
|
||||||
from platypush.entities._engine.repo import EntitiesRepository
|
from . import EntitiesRepository
|
||||||
|
|
||||||
self._repo: EntitiesRepository = repository
|
self._repo: EntitiesRepository = repository # type: ignore
|
||||||
|
|
||||||
def merge(
|
def merge(
|
||||||
self,
|
self,
|
||||||
|
@ -26,8 +26,8 @@ class EntitiesMerger:
|
||||||
the parent/child relationships and return a tuple with
|
the parent/child relationships and return a tuple with
|
||||||
``[new_entities, updated_entities]``.
|
``[new_entities, updated_entities]``.
|
||||||
"""
|
"""
|
||||||
new_entities = {}
|
new_entities: Dict[Tuple[str, str], Entity] = {}
|
||||||
existing_entities = {}
|
existing_entities: Dict[Tuple[str, str], Entity] = {}
|
||||||
|
|
||||||
self._merge(
|
self._merge(
|
||||||
session,
|
session,
|
||||||
|
|
Loading…
Reference in a new issue