The batch of entities currently being processed should have no duplicate keys

This commit is contained in:
Fabio Manganiello 2023-01-11 01:22:56 +01:00
parent 4a2851231c
commit 38438230d7
Signed by: blacklight
GPG key ID: D90FBA7F76362774
3 changed files with 52 additions and 7 deletions

View file

@ -83,8 +83,8 @@ class EntitiesDb:
already been flushed. already been flushed.
""" """
# Index childrens by parent_id and by parent_key # Index childrens by parent_id and by parent_key
children_by_parent_id = defaultdict(list) children_by_parent_id = defaultdict(lambda: defaultdict(Entity))
children_by_parent_key = defaultdict(list) children_by_parent_key = defaultdict(lambda: defaultdict(Entity))
for entity in entities: for entity in entities:
parent_key = None parent_key = None
parent_id = entity.parent_id parent_id = entity.parent_id
@ -93,9 +93,9 @@ class EntitiesDb:
parent_key = entity.parent.entity_key parent_key = entity.parent.entity_key
if parent_id: if parent_id:
children_by_parent_id[parent_id].append(entity) children_by_parent_id[parent_id][entity.entity_key] = entity
if parent_key: if parent_key:
children_by_parent_key[parent_key].append(entity) children_by_parent_key[parent_key][entity.entity_key] = entity
# Find the root entities in the hierarchy (i.e. those that have a null # Find the root entities in the hierarchy (i.e. those that have a null
# parent) # parent)
@ -131,14 +131,17 @@ class EntitiesDb:
# Index the children nodes by key # Index the children nodes by key
children_to_process = { children_to_process = {
e.entity_key: e e.entity_key: e
for e in children_by_parent_key.get(entity.entity_key, []) for e in children_by_parent_key.get(entity.entity_key, {}).values()
} }
# If this entity has already been persisted, add back its children # If this entity has already been persisted, add back its children
# that haven't been updated, so we won't lose those connections # that haven't been updated, so we won't lose those connections
if entity.id: if entity.id:
children_to_process.update( children_to_process.update(
{e.entity_key: e for e in children_by_parent_id.get(entity.id, [])} {
e.entity_key: e
for e in children_by_parent_id.get(entity.id, {}).values()
}
) )
# Add all the updated+inserted+existing children to the next layer # Add all the updated+inserted+existing children to the next layer

View file

@ -51,6 +51,19 @@ class EntitiesMerger:
processed_entities = [] processed_entities = []
existing_entities.update(self._repo.get(session, entities, use_cache=False)) existing_entities.update(self._repo.get(session, entities, use_cache=False))
# Make sure that we have no duplicate entity keys in the current batch
entities = list(
{
**({e.entity_key: e for e in entities}),
**(
{
e.entity_key: e
for e in {str(ee.id): ee for ee in entities if ee.id}.values()
}
),
}.values()
)
# Retrieve existing records and merge them # Retrieve existing records and merge them
for entity in entities: for entity in entities:
key = entity.entity_key key = entity.entity_key
@ -109,6 +122,7 @@ class EntitiesMerger:
else: else:
temp_entity = new_entities.get(parent.entity_key) temp_entity = new_entities.get(parent.entity_key)
if temp_entity: if temp_entity:
self._remove_duplicate_children(entity, temp_entity)
parent = entity.parent = temp_entity parent = entity.parent = temp_entity
else: else:
new_entities[parent.entity_key] = parent new_entities[parent.entity_key] = parent
@ -125,6 +139,34 @@ class EntitiesMerger:
return parent_id, parent return parent_id, parent
@staticmethod
def _remove_duplicate_children(entity: Entity, parent: Optional[Entity] = None):
if not parent:
return
# Make sure that an entity has no duplicate entity IDs among its
# children
existing_child_index_by_id = None
if entity.id:
try:
existing_child_index_by_id = [e.id for e in parent.children].index(
entity.id
)
parent.children.pop(existing_child_index_by_id)
except ValueError:
pass
# Make sure that an entity has no duplicate entity keys among its
# children
existing_child_index_by_key = None
try:
existing_child_index_by_key = [e.entity_key for e in parent.children].index(
entity.entity_key
)
parent.children.pop(existing_child_index_by_key)
except ValueError:
pass
@classmethod @classmethod
def _merge_columns(cls, entity: Entity, existing_entity: Entity) -> Entity: def _merge_columns(cls, entity: Entity, existing_entity: Entity) -> Entity:
""" """

View file

@ -904,7 +904,7 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init]
workers[device].join(timeout=kwargs.get('timeout')) workers[device].join(timeout=kwargs.get('timeout'))
except Exception as e: except Exception as e:
self.logger.warning( self.logger.warning(
'An error while getting the status of the device {}: {}'.format( 'An error occurred while getting the status of the device {}: {}'.format(
device, str(e) device, str(e)
) )
) )