forked from platypush/platypush
Better dynamic entities discovery
This commit is contained in:
parent
332c91252c
commit
e6bfa1c50f
5 changed files with 43 additions and 37 deletions
|
@ -143,44 +143,30 @@ export default {
|
||||||
},
|
},
|
||||||
|
|
||||||
async refresh() {
|
async refresh() {
|
||||||
const actions = Object.keys(
|
this.loadingEntities = Object.entries(this.entities).reduce((obj, [id, entity]) => {
|
||||||
Object.values(this.selector.selectedEntities).reduce((obj, entity) => {
|
const self = this
|
||||||
if (entity.plugin)
|
if (this.entityTimeouts[id])
|
||||||
obj[entity.plugin] = true
|
clearTimeout(this.entityTimeouts[id])
|
||||||
return obj
|
|
||||||
}, {})
|
|
||||||
).map((plugin) => `${plugin}.status`)
|
|
||||||
|
|
||||||
this.loadingEntities = {
|
this.entityTimeouts[id] = setTimeout(() => {
|
||||||
...this.loadingEntities,
|
if (self.loadingEntities[id])
|
||||||
...Object.keys(this.selector.selectedEntities).reduce((obj, id) => {
|
delete self.loadingEntities[id]
|
||||||
const self = this
|
if (self.entityTimeouts[id])
|
||||||
const entity = this.entities[id]
|
delete self.entityTimeouts[id]
|
||||||
|
|
||||||
if (this.entityTimeouts[id])
|
self.errorEntities[id] = entity
|
||||||
clearTimeout(this.entityTimeouts[id])
|
self.notify({
|
||||||
|
error: true,
|
||||||
|
title: entity.plugin,
|
||||||
|
text: `Scan timeout for ${entity.name}`,
|
||||||
|
})
|
||||||
|
}, this.entityScanTimeout * 1000)
|
||||||
|
|
||||||
this.entityTimeouts[id] = setTimeout(() => {
|
obj[id] = true
|
||||||
if (self.loadingEntities[id])
|
return obj
|
||||||
delete self.loadingEntities[id]
|
}, {})
|
||||||
if (self.entityTimeouts[id])
|
|
||||||
delete self.entityTimeouts[id]
|
|
||||||
|
|
||||||
self.errorEntities[id] = entity
|
await this.request('entities.scan')
|
||||||
self.notify({
|
|
||||||
error: true,
|
|
||||||
title: entity.plugin,
|
|
||||||
text: `Scan timeout for ${entity.name}`,
|
|
||||||
})
|
|
||||||
}, this.entityScanTimeout * 1000)
|
|
||||||
|
|
||||||
obj[id] = true
|
|
||||||
return obj
|
|
||||||
}, {}),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Force refresh by calling `.status` on all the selected plugins
|
|
||||||
await Promise.all(actions.map((act) => this.request(act)))
|
|
||||||
},
|
},
|
||||||
|
|
||||||
async sync() {
|
async sync() {
|
||||||
|
@ -231,6 +217,7 @@ export default {
|
||||||
...event.entity,
|
...event.entity,
|
||||||
meta: {
|
meta: {
|
||||||
...(this.entities[entityId]?.meta || {}),
|
...(this.entities[entityId]?.meta || {}),
|
||||||
|
...(meta[event.entity.type] || {}),
|
||||||
...(event.entity?.meta || {}),
|
...(event.entity?.meta || {}),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import json
|
||||||
from logging import getLogger
|
from logging import getLogger
|
||||||
from queue import Queue, Empty
|
from queue import Queue, Empty
|
||||||
from threading import Thread, Event, RLock
|
from threading import Thread, Event, RLock
|
||||||
|
@ -23,6 +24,7 @@ class EntitiesEngine(Thread):
|
||||||
self.logger = getLogger(name=obj_name)
|
self.logger = getLogger(name=obj_name)
|
||||||
self._queue = Queue()
|
self._queue = Queue()
|
||||||
self._should_stop = Event()
|
self._should_stop = Event()
|
||||||
|
self._entities_awaiting_flush = set()
|
||||||
self._entities_cache_lock = RLock()
|
self._entities_cache_lock = RLock()
|
||||||
self._entities_cache = {
|
self._entities_cache = {
|
||||||
'by_id': {},
|
'by_id': {},
|
||||||
|
@ -110,6 +112,16 @@ class EntitiesEngine(Thread):
|
||||||
self._populate_entity_id_from_cache(entity)
|
self._populate_entity_id_from_cache(entity)
|
||||||
if entity.id:
|
if entity.id:
|
||||||
get_bus().post(EntityUpdateEvent(entity=entity))
|
get_bus().post(EntityUpdateEvent(entity=entity))
|
||||||
|
else:
|
||||||
|
self._entities_awaiting_flush.add(self._to_entity_awaiting_flush(entity))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _to_entity_awaiting_flush(entity: Entity):
|
||||||
|
e = entity.to_json()
|
||||||
|
return json.dumps(
|
||||||
|
{k: v for k, v in e.items() if k in {'external_id', 'name', 'plugin'}},
|
||||||
|
sort_keys=True,
|
||||||
|
)
|
||||||
|
|
||||||
def post(self, *entities: Entity):
|
def post(self, *entities: Entity):
|
||||||
for entity in entities:
|
for entity in entities:
|
||||||
|
@ -226,3 +238,10 @@ class EntitiesEngine(Thread):
|
||||||
with self._entities_cache_lock:
|
with self._entities_cache_lock:
|
||||||
for entity in entities:
|
for entity in entities:
|
||||||
self._cache_entities(entity, overwrite_cache=True)
|
self._cache_entities(entity, overwrite_cache=True)
|
||||||
|
|
||||||
|
entities_awaiting_flush = {*self._entities_awaiting_flush}
|
||||||
|
for entity in entities:
|
||||||
|
e = self._to_entity_awaiting_flush(entity)
|
||||||
|
if e in entities_awaiting_flush:
|
||||||
|
self._process_event(entity)
|
||||||
|
self._entities_awaiting_flush.remove(e)
|
||||||
|
|
|
@ -6,7 +6,7 @@ from ._base import Entity
|
||||||
class Device(Entity):
|
class Device(Entity):
|
||||||
__tablename__ = 'device'
|
__tablename__ = 'device'
|
||||||
|
|
||||||
id = Column(Integer, ForeignKey(Entity.id), primary_key=True)
|
id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True)
|
||||||
|
|
||||||
__mapper_args__ = {
|
__mapper_args__ = {
|
||||||
'polymorphic_identity': __tablename__,
|
'polymorphic_identity': __tablename__,
|
||||||
|
|
|
@ -6,7 +6,7 @@ from .devices import Device
|
||||||
class Light(Device):
|
class Light(Device):
|
||||||
__tablename__ = 'light'
|
__tablename__ = 'light'
|
||||||
|
|
||||||
id = Column(Integer, ForeignKey(Device.id), primary_key=True)
|
id = Column(Integer, ForeignKey(Device.id, ondelete='CASCADE'), primary_key=True)
|
||||||
|
|
||||||
__mapper_args__ = {
|
__mapper_args__ = {
|
||||||
'polymorphic_identity': __tablename__,
|
'polymorphic_identity': __tablename__,
|
||||||
|
|
|
@ -6,7 +6,7 @@ from .devices import Device
|
||||||
class Switch(Device):
|
class Switch(Device):
|
||||||
__tablename__ = 'switch'
|
__tablename__ = 'switch'
|
||||||
|
|
||||||
id = Column(Integer, ForeignKey(Device.id), primary_key=True)
|
id = Column(Integer, ForeignKey(Device.id, ondelete='CASCADE'), primary_key=True)
|
||||||
state = Column(Boolean)
|
state = Column(Boolean)
|
||||||
|
|
||||||
__mapper_args__ = {
|
__mapper_args__ = {
|
||||||
|
|
Loading…
Reference in a new issue