diff --git a/platypush/plugins/procedures/__init__.py b/platypush/plugins/procedures/__init__.py index 0f104ed027..7fcdca31a2 100644 --- a/platypush/plugins/procedures/__init__.py +++ b/platypush/plugins/procedures/__init__.py @@ -1,10 +1,13 @@ +from functools import wraps import json import re from contextlib import contextmanager from dataclasses import dataclass from multiprocessing import RLock from random import randint +from threading import Event from typing import ( + Any, Callable, Collection, Dict, @@ -19,6 +22,7 @@ import yaml from sqlalchemy.orm import Session from platypush.context import get_plugin +from platypush.entities import get_entities_engine from platypush.entities.managers.procedures import ProcedureEntityManager from platypush.entities.procedures import Procedure, ProcedureType from platypush.message.event.entities import EntityDeleteEvent @@ -29,6 +33,29 @@ from platypush.utils import run from ._serialize import ProcedureEncoder +# pylint: disable=protected-access +def ensure_initialized(f: Callable[..., Any]): + """ + Ensures that the entities engine has been initialized before + reading/writing the db. + """ + + @wraps(f) + def wrapper(*args, **kwargs): + self: ProceduresPlugin = args[0] + + if not self._initialized.is_set(): + with self._init_lock: + get_entities_engine(timeout=20) + + if not self._initialized.is_set(): + self._initialized.set() + + return f(*args, **kwargs) + + return wrapper + + @dataclass class _ProcedureWrapper: name: str @@ -42,6 +69,8 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + self._initialized = Event() + self._init_lock = RLock() self._status_lock = RLock() @action @@ -306,6 +335,7 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): else: return [cls._serialize_action(item) for item in data if item is not None] + @ensure_initialized @contextmanager def _db_session(self) -> Generator[Session, None, None]: db: Optional[DbPlugin] = get_plugin(DbPlugin)