[procedures] Sync only after the db engine is initialized.

This commit is contained in:
Fabio Manganiello 2024-10-14 21:45:28 +02:00
parent 585c2f733f
commit 9b99c1e19d
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774

View file

@ -1,10 +1,13 @@
from functools import wraps
import json import json
import re import re
from contextlib import contextmanager from contextlib import contextmanager
from dataclasses import dataclass from dataclasses import dataclass
from multiprocessing import RLock from multiprocessing import RLock
from random import randint from random import randint
from threading import Event
from typing import ( from typing import (
Any,
Callable, Callable,
Collection, Collection,
Dict, Dict,
@ -19,6 +22,7 @@ import yaml
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from platypush.context import get_plugin from platypush.context import get_plugin
from platypush.entities import get_entities_engine
from platypush.entities.managers.procedures import ProcedureEntityManager from platypush.entities.managers.procedures import ProcedureEntityManager
from platypush.entities.procedures import Procedure, ProcedureType from platypush.entities.procedures import Procedure, ProcedureType
from platypush.message.event.entities import EntityDeleteEvent from platypush.message.event.entities import EntityDeleteEvent
@ -29,6 +33,29 @@ from platypush.utils import run
from ._serialize import ProcedureEncoder from ._serialize import ProcedureEncoder
# pylint: disable=protected-access
def ensure_initialized(f: Callable[..., Any]):
"""
Ensures that the entities engine has been initialized before
reading/writing the db.
"""
@wraps(f)
def wrapper(*args, **kwargs):
self: ProceduresPlugin = args[0]
if not self._initialized.is_set():
with self._init_lock:
get_entities_engine(timeout=20)
if not self._initialized.is_set():
self._initialized.set()
return f(*args, **kwargs)
return wrapper
@dataclass @dataclass
class _ProcedureWrapper: class _ProcedureWrapper:
name: str name: str
@ -42,6 +69,8 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._initialized = Event()
self._init_lock = RLock()
self._status_lock = RLock() self._status_lock = RLock()
@action @action
@ -306,6 +335,7 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager):
else: else:
return [cls._serialize_action(item) for item in data if item is not None] return [cls._serialize_action(item) for item in data if item is not None]
@ensure_initialized
@contextmanager @contextmanager
def _db_session(self) -> Generator[Session, None, None]: def _db_session(self) -> Generator[Session, None, None]:
db: Optional[DbPlugin] = get_plugin(DbPlugin) db: Optional[DbPlugin] = get_plugin(DbPlugin)