From 38438230d76492ce0d9baee83edf83c9137eb95b Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 11 Jan 2023 01:22:56 +0100 Subject: [PATCH] The batch of entities currently being processed should have no duplicate keys --- platypush/entities/_engine/repo/db.py | 15 ++++---- platypush/entities/_engine/repo/merger.py | 42 +++++++++++++++++++++++ platypush/plugins/zigbee/mqtt/__init__.py | 2 +- 3 files changed, 52 insertions(+), 7 deletions(-) diff --git a/platypush/entities/_engine/repo/db.py b/platypush/entities/_engine/repo/db.py index 77a5668c..1114da34 100644 --- a/platypush/entities/_engine/repo/db.py +++ b/platypush/entities/_engine/repo/db.py @@ -83,8 +83,8 @@ class EntitiesDb: already been flushed. """ # Index childrens by parent_id and by parent_key - children_by_parent_id = defaultdict(list) - children_by_parent_key = defaultdict(list) + children_by_parent_id = defaultdict(lambda: defaultdict(Entity)) + children_by_parent_key = defaultdict(lambda: defaultdict(Entity)) for entity in entities: parent_key = None parent_id = entity.parent_id @@ -93,9 +93,9 @@ class EntitiesDb: parent_key = entity.parent.entity_key if parent_id: - children_by_parent_id[parent_id].append(entity) + children_by_parent_id[parent_id][entity.entity_key] = entity if parent_key: - children_by_parent_key[parent_key].append(entity) + children_by_parent_key[parent_key][entity.entity_key] = entity # Find the root entities in the hierarchy (i.e. those that have a null # parent) @@ -131,14 +131,17 @@ class EntitiesDb: # Index the children nodes by key children_to_process = { e.entity_key: e - for e in children_by_parent_key.get(entity.entity_key, []) + for e in children_by_parent_key.get(entity.entity_key, {}).values() } # If this entity has already been persisted, add back its children # that haven't been updated, so we won't lose those connections if entity.id: children_to_process.update( - {e.entity_key: e for e in children_by_parent_id.get(entity.id, [])} + { + e.entity_key: e + for e in children_by_parent_id.get(entity.id, {}).values() + } ) # Add all the updated+inserted+existing children to the next layer diff --git a/platypush/entities/_engine/repo/merger.py b/platypush/entities/_engine/repo/merger.py index cb397c26..b3b88dd4 100644 --- a/platypush/entities/_engine/repo/merger.py +++ b/platypush/entities/_engine/repo/merger.py @@ -51,6 +51,19 @@ class EntitiesMerger: processed_entities = [] existing_entities.update(self._repo.get(session, entities, use_cache=False)) + # Make sure that we have no duplicate entity keys in the current batch + entities = list( + { + **({e.entity_key: e for e in entities}), + **( + { + e.entity_key: e + for e in {str(ee.id): ee for ee in entities if ee.id}.values() + } + ), + }.values() + ) + # Retrieve existing records and merge them for entity in entities: key = entity.entity_key @@ -109,6 +122,7 @@ class EntitiesMerger: else: temp_entity = new_entities.get(parent.entity_key) if temp_entity: + self._remove_duplicate_children(entity, temp_entity) parent = entity.parent = temp_entity else: new_entities[parent.entity_key] = parent @@ -125,6 +139,34 @@ class EntitiesMerger: return parent_id, parent + @staticmethod + def _remove_duplicate_children(entity: Entity, parent: Optional[Entity] = None): + if not parent: + return + + # Make sure that an entity has no duplicate entity IDs among its + # children + existing_child_index_by_id = None + if entity.id: + try: + existing_child_index_by_id = [e.id for e in parent.children].index( + entity.id + ) + parent.children.pop(existing_child_index_by_id) + except ValueError: + pass + + # Make sure that an entity has no duplicate entity keys among its + # children + existing_child_index_by_key = None + try: + existing_child_index_by_key = [e.entity_key for e in parent.children].index( + entity.entity_key + ) + parent.children.pop(existing_child_index_by_key) + except ValueError: + pass + @classmethod def _merge_columns(cls, entity: Entity, existing_entity: Entity) -> Entity: """ diff --git a/platypush/plugins/zigbee/mqtt/__init__.py b/platypush/plugins/zigbee/mqtt/__init__.py index a857962e..46ec1987 100644 --- a/platypush/plugins/zigbee/mqtt/__init__.py +++ b/platypush/plugins/zigbee/mqtt/__init__.py @@ -904,7 +904,7 @@ class ZigbeeMqttPlugin(MqttPlugin): # lgtm [py/missing-call-to-init] workers[device].join(timeout=kwargs.get('timeout')) except Exception as e: self.logger.warning( - 'An error while getting the status of the device {}: {}'.format( + 'An error occurred while getting the status of the device {}: {}'.format( device, str(e) ) )