forked from platypush/platypush
Added @ensure_initialized
decorator to actions in variable
.
The `variable` plugin may break in the constructor the first time the application is started. That's because it tries to initialize the cache of stored variables, but the local database hasn't yet been initialized. That's because plugins are registered _before_ the entities engine is initialized, as the entities engine assumes that it already has plugins to scan for entities. Therefore, the initialization of the `variable` plugin's cache should be lazy (only done upon the first call to `get`/`set` etc.), in order to prevent deadlock situations where the plugin waits for the engine to start, but the engine will be initialized only after the plugin is ready. And the lazy initialization logic should also ensure that the entities engine has been properly started (and emit a `TimeoutError` if that's not the case), in order to prevent race conditions.
This commit is contained in:
parent
adfedfa2dd
commit
bf7d060b81
1 changed files with 49 additions and 10 deletions
|
@ -1,11 +1,45 @@
|
|||
from typing import Collection, Dict, Iterable, Optional, Union
|
||||
from functools import wraps
|
||||
from threading import Event, RLock
|
||||
from typing import Any, Callable, Collection, Dict, Iterable, Optional, Union
|
||||
from typing_extensions import override
|
||||
|
||||
from platypush.entities import EntityManager
|
||||
from platypush.entities import EntityManager, get_entities_engine
|
||||
from platypush.entities.variables import Variable
|
||||
from platypush.plugins import Plugin, action
|
||||
|
||||
|
||||
# pylint: disable=protected-access
|
||||
def ensure_initialized(f: Callable[..., Any]):
|
||||
"""
|
||||
Ensures that the entities engine has been initialized before
|
||||
reading/writing the db.
|
||||
|
||||
It also performs lazy initialization of the variables in the local cache.
|
||||
"""
|
||||
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
self: VariablePlugin = args[0]
|
||||
|
||||
if not self._initialized.is_set():
|
||||
with self._init_lock:
|
||||
get_entities_engine(timeout=20)
|
||||
|
||||
if not self._initialized.is_set():
|
||||
self._initialized.set()
|
||||
with self._db.get_session() as session:
|
||||
self._db_vars.update(
|
||||
{ # type: ignore
|
||||
str(var.name): var.value
|
||||
for var in session.query(Variable).all()
|
||||
}
|
||||
)
|
||||
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
class VariablePlugin(Plugin, EntityManager):
|
||||
"""
|
||||
This plugin allows you to manipulate context variables that can be
|
||||
|
@ -17,18 +51,15 @@ class VariablePlugin(Plugin, EntityManager):
|
|||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
db = self._db
|
||||
self._db_vars: Dict[str, Optional[str]] = {}
|
||||
""" Local cache for db variables. """
|
||||
|
||||
with db.get_session() as session:
|
||||
self._db_vars.update(
|
||||
{ # type: ignore
|
||||
str(var.name): var.value for var in session.query(Variable).all()
|
||||
}
|
||||
)
|
||||
self._initialized = Event()
|
||||
""" Lazy initialization event for the _db_vars map. """
|
||||
self._init_lock = RLock()
|
||||
""" Lock for the _db_vars map initialization. """
|
||||
|
||||
@action
|
||||
@ensure_initialized
|
||||
def get(self, name: Optional[str] = None, default_value=None):
|
||||
"""
|
||||
Get the value of a variable by name from the local db.
|
||||
|
@ -45,6 +76,7 @@ class VariablePlugin(Plugin, EntityManager):
|
|||
)
|
||||
|
||||
@action
|
||||
@ensure_initialized
|
||||
def set(self, **kwargs):
|
||||
"""
|
||||
Set a variable or a set of variables on the local db.
|
||||
|
@ -57,6 +89,7 @@ class VariablePlugin(Plugin, EntityManager):
|
|||
return kwargs
|
||||
|
||||
@action
|
||||
@ensure_initialized
|
||||
def delete(self, name: str):
|
||||
"""
|
||||
Delete a variable from the database.
|
||||
|
@ -76,6 +109,7 @@ class VariablePlugin(Plugin, EntityManager):
|
|||
return True
|
||||
|
||||
@action
|
||||
@ensure_initialized
|
||||
def unset(self, name: str):
|
||||
"""
|
||||
Unset a variable by name if it's set on the local db.
|
||||
|
@ -89,6 +123,7 @@ class VariablePlugin(Plugin, EntityManager):
|
|||
return self.set(**{name: None})
|
||||
|
||||
@action
|
||||
@ensure_initialized
|
||||
def mget(self, name: str):
|
||||
"""
|
||||
Get the value of a variable by name from Redis.
|
||||
|
@ -100,6 +135,7 @@ class VariablePlugin(Plugin, EntityManager):
|
|||
return self._redis.mget([name])
|
||||
|
||||
@action
|
||||
@ensure_initialized
|
||||
def mset(self, **kwargs):
|
||||
"""
|
||||
Set a variable or a set of variables on Redis.
|
||||
|
@ -112,6 +148,7 @@ class VariablePlugin(Plugin, EntityManager):
|
|||
return kwargs
|
||||
|
||||
@action
|
||||
@ensure_initialized
|
||||
def munset(self, name: str):
|
||||
"""
|
||||
Unset a Redis variable by name if it's set
|
||||
|
@ -122,6 +159,7 @@ class VariablePlugin(Plugin, EntityManager):
|
|||
return self._redis.delete(name)
|
||||
|
||||
@action
|
||||
@ensure_initialized
|
||||
def expire(self, name: str, expire: int):
|
||||
"""
|
||||
Set a variable expiration on Redis
|
||||
|
@ -157,6 +195,7 @@ class VariablePlugin(Plugin, EntityManager):
|
|||
|
||||
@override
|
||||
@action
|
||||
@ensure_initialized
|
||||
def status(self, *_, **__):
|
||||
variables = {
|
||||
name: value for name, value in self._db_vars.items() if value is not None
|
||||
|
|
Loading…
Reference in a new issue