[#341] Support for procedure reconciliation.

If some procedures are removed either from the configuration or from the
loaded scripts, then their associated entities should also be removed
from the database when the `procedures` plugin is loaded.
This commit is contained in:
Fabio Manganiello 2024-08-25 23:45:11 +02:00
parent 99909c73ab
commit dea72fbfdb

View file

@ -2,9 +2,11 @@ import json
from dataclasses import dataclass from dataclasses import dataclass
from typing import Callable, Collection, Optional, Union from typing import Callable, Collection, Optional, Union
from platypush.context import get_plugin
from platypush.entities.managers.procedures import ProcedureEntityManager from platypush.entities.managers.procedures import ProcedureEntityManager
from platypush.entities.procedures import Procedure from platypush.entities.procedures import Procedure
from platypush.plugins import RunnablePlugin, action from platypush.plugins import RunnablePlugin, action
from platypush.plugins.db import DbPlugin
from platypush.utils import run from platypush.utils import run
from ._serialize import ProcedureEncoder from ._serialize import ProcedureEncoder
@ -72,6 +74,24 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager):
for name, proc in self._all_procedures.items() for name, proc in self._all_procedures.items()
] ]
def _sync_db_procedures(self):
cur_proc_names = set(self._all_procedures.keys())
db: Optional[DbPlugin] = get_plugin(DbPlugin)
assert db, 'No database plugin configured'
with db.get_session(
autoflush=False, autocommit=False, expire_on_commit=False
) as session:
procs_to_remove = (
session.query(Procedure)
.filter(Procedure.name.not_in(cur_proc_names))
.all()
)
for proc in procs_to_remove:
self.logger.info('Removing stale procedure record for %s', proc.name)
session.delete(proc)
@staticmethod @staticmethod
def _serialize_procedure( def _serialize_procedure(
proc: Union[dict, Callable], name: Optional[str] = None proc: Union[dict, Callable], name: Optional[str] = None
@ -89,6 +109,8 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager):
} }
def main(self, *_, **__): def main(self, *_, **__):
self._sync_db_procedures()
while not self.should_stop(): while not self.should_stop():
self.publish_entities(self._get_wrapped_procedures()) self.publish_entities(self._get_wrapped_procedures())
self.wait_stop() self.wait_stop()