From bf7d060b81741204d19f5abb033fd18afc28906e Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 17 Aug 2023 02:47:30 +0200 Subject: [PATCH] Added `@ensure_initialized` decorator to actions in `variable`. The `variable` plugin may break in the constructor the first time the application is started. That's because it tries to initialize the cache of stored variables, but the local database hasn't yet been initialized. That's because plugins are registered _before_ the entities engine is initialized, as the entities engine assumes that it already has plugins to scan for entities. Therefore, the initialization of the `variable` plugin's cache should be lazy (only done upon the first call to `get`/`set` etc.), in order to prevent deadlock situations where the plugin waits for the engine to start, but the engine will be initialized only after the plugin is ready. And the lazy initialization logic should also ensure that the entities engine has been properly started (and emit a `TimeoutError` if that's not the case), in order to prevent race conditions. --- platypush/plugins/variable/__init__.py | 59 +++++++++++++++++++++----- 1 file changed, 49 insertions(+), 10 deletions(-) diff --git a/platypush/plugins/variable/__init__.py b/platypush/plugins/variable/__init__.py index 85520f70d8..5c4f26595a 100644 --- a/platypush/plugins/variable/__init__.py +++ b/platypush/plugins/variable/__init__.py @@ -1,11 +1,45 @@ -from typing import Collection, Dict, Iterable, Optional, Union +from functools import wraps +from threading import Event, RLock +from typing import Any, Callable, Collection, Dict, Iterable, Optional, Union from typing_extensions import override -from platypush.entities import EntityManager +from platypush.entities import EntityManager, get_entities_engine from platypush.entities.variables import Variable from platypush.plugins import Plugin, action +# pylint: disable=protected-access +def ensure_initialized(f: Callable[..., Any]): + """ + Ensures that the entities engine has been initialized before + reading/writing the db. + + It also performs lazy initialization of the variables in the local cache. + """ + + @wraps(f) + def wrapper(*args, **kwargs): + self: VariablePlugin = args[0] + + if not self._initialized.is_set(): + with self._init_lock: + get_entities_engine(timeout=20) + + if not self._initialized.is_set(): + self._initialized.set() + with self._db.get_session() as session: + self._db_vars.update( + { # type: ignore + str(var.name): var.value + for var in session.query(Variable).all() + } + ) + + return f(*args, **kwargs) + + return wrapper + + class VariablePlugin(Plugin, EntityManager): """ This plugin allows you to manipulate context variables that can be @@ -17,18 +51,15 @@ class VariablePlugin(Plugin, EntityManager): def __init__(self, **kwargs): super().__init__(**kwargs) - db = self._db self._db_vars: Dict[str, Optional[str]] = {} """ Local cache for db variables. """ - - with db.get_session() as session: - self._db_vars.update( - { # type: ignore - str(var.name): var.value for var in session.query(Variable).all() - } - ) + self._initialized = Event() + """ Lazy initialization event for the _db_vars map. """ + self._init_lock = RLock() + """ Lock for the _db_vars map initialization. """ @action + @ensure_initialized def get(self, name: Optional[str] = None, default_value=None): """ Get the value of a variable by name from the local db. @@ -45,6 +76,7 @@ class VariablePlugin(Plugin, EntityManager): ) @action + @ensure_initialized def set(self, **kwargs): """ Set a variable or a set of variables on the local db. @@ -57,6 +89,7 @@ class VariablePlugin(Plugin, EntityManager): return kwargs @action + @ensure_initialized def delete(self, name: str): """ Delete a variable from the database. @@ -76,6 +109,7 @@ class VariablePlugin(Plugin, EntityManager): return True @action + @ensure_initialized def unset(self, name: str): """ Unset a variable by name if it's set on the local db. @@ -89,6 +123,7 @@ class VariablePlugin(Plugin, EntityManager): return self.set(**{name: None}) @action + @ensure_initialized def mget(self, name: str): """ Get the value of a variable by name from Redis. @@ -100,6 +135,7 @@ class VariablePlugin(Plugin, EntityManager): return self._redis.mget([name]) @action + @ensure_initialized def mset(self, **kwargs): """ Set a variable or a set of variables on Redis. @@ -112,6 +148,7 @@ class VariablePlugin(Plugin, EntityManager): return kwargs @action + @ensure_initialized def munset(self, name: str): """ Unset a Redis variable by name if it's set @@ -122,6 +159,7 @@ class VariablePlugin(Plugin, EntityManager): return self._redis.delete(name) @action + @ensure_initialized def expire(self, name: str, expire: int): """ Set a variable expiration on Redis @@ -157,6 +195,7 @@ class VariablePlugin(Plugin, EntityManager): @override @action + @ensure_initialized def status(self, *_, **__): variables = { name: value for name, value in self._db_vars.items() if value is not None