Compare commits

...

8 commits

Author SHA1 Message Date
531ae947d1 Merge pull request 'Support procedures as native entities' (#425) from 341/procedure-entities into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #425
2024-08-28 00:17:58 +02:00
d9916873cb
[procedures] Store actions for YAML procedures.
All checks were successful
continuous-integration/drone/push Build is passing
2024-08-28 00:14:54 +02:00
234963b069
🐛 Fixed import error 2024-08-28 00:14:21 +02:00
ffc3fe218d
[config] Added config.get_config_dir method. 2024-08-28 00:13:46 +02:00
0f186c44ef
[#341] Support for procedure reconciliation.
All checks were successful
continuous-integration/drone/push Build is passing
If some procedures are removed either from the configuration or from the
loaded scripts, then their associated entities should also be removed
from the database when the `procedures` plugin is loaded.
2024-08-25 23:45:11 +02:00
06781cd72c
[#341] Backend implementation of the new procedure entities architecture. 2024-08-25 16:06:56 +02:00
24f7d4a789
Merge branch 'master' into 341/procedure-entities 2024-08-25 14:29:43 +02:00
af21ff13ff
[WIP]
All checks were successful
continuous-integration/drone/push Build is passing
2024-08-13 22:27:10 +02:00
9 changed files with 213 additions and 3 deletions

View file

@ -272,6 +272,7 @@ class Config:
@property @property
def _core_plugins(self) -> Dict[str, dict]: def _core_plugins(self) -> Dict[str, dict]:
return { return {
'procedures': {},
'variable': {}, 'variable': {},
} }
@ -422,6 +423,7 @@ class Config:
self.procedures[procedure_name] = { self.procedures[procedure_name] = {
'_async': _async, '_async': _async,
'type': 'config',
'actions': component, 'actions': component,
'args': args, 'args': args,
} }

View file

@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
from typing import Callable, Dict, Union
from platypush.config import Config
from . import EntityManager
class ProcedureEntityManager(EntityManager, ABC):
"""
Base class for integrations that can run and manage procedures.
"""
@abstractmethod
def exec(self, procedure: str, *args, **kwargs):
"""
Run a procedure.
:param procedure: Procedure to run, by name.
:param args: Arguments to pass to the procedure.
:param kwargs: Keyword arguments to pass to the procedure.
"""
raise NotImplementedError()
@property
def _all_procedures(self) -> Dict[str, Union[dict, Callable]]:
"""
:return: All the procedures that can be run by this entity manager.
"""
return Config.get_procedures()

View file

@ -0,0 +1,43 @@
import logging
from sqlalchemy import (
Column,
Enum,
ForeignKey,
Integer,
JSON,
String,
)
from platypush.common.db import is_defined
from . import Entity
logger = logging.getLogger(__name__)
if not is_defined('procedure'):
class Procedure(Entity):
"""
Models a procedure entity.
"""
__tablename__ = 'procedure'
id = Column(
Integer, ForeignKey('entity.id', ondelete='CASCADE'), primary_key=True
)
args = Column(JSON, nullable=False, default=[])
procedure_type = Column(
Enum('python', 'config', name='procedure_type'), nullable=False
)
module = Column(String)
source = Column(String)
line = Column(Integer)
actions = Column(JSON)
__table_args__ = {'keep_existing': True}
__mapper_args__ = {
'polymorphic_identity': __tablename__,
}

View file

@ -1,4 +1,5 @@
import json import json
import os
from platypush import Config from platypush import Config
from platypush.message import Message from platypush.message import Message
@ -66,5 +67,12 @@ class ConfigPlugin(Plugin):
""" """
return Config._instance.config_file return Config._instance.config_file
@action
def get_config_dir(self) -> str:
"""
:return: The path to the configuration directory.
"""
return os.path.dirname(Config._instance.config_file)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -12,6 +12,7 @@ from platypush.common.db import override_definitions
from platypush.common.reflection import Integration, Message as MessageMetadata from platypush.common.reflection import Integration, Message as MessageMetadata
from platypush.config import Config from platypush.config import Config
from platypush.plugins import Plugin, action from platypush.plugins import Plugin, action
from platypush.plugins.procedures import ProcedureEncoder
from platypush.message import Message from platypush.message import Message
from platypush.message.event import Event from platypush.message.event import Event
from platypush.message.response import Response from platypush.message.response import Response
@ -20,7 +21,6 @@ from platypush.utils.mock import auto_mocks
from platypush.utils.manifest import Manifest, Manifests, PackageManagers from platypush.utils.manifest import Manifest, Manifests, PackageManagers
from ._cache import Cache from ._cache import Cache
from ._serialize import ProcedureEncoder
class InspectPlugin(Plugin): class InspectPlugin(Plugin):

View file

@ -0,0 +1,117 @@
import json
from dataclasses import dataclass
from typing import Callable, Collection, Optional, Union
from platypush.context import get_plugin
from platypush.entities.managers.procedures import ProcedureEntityManager
from platypush.entities.procedures import Procedure
from platypush.plugins import RunnablePlugin, action
from platypush.plugins.db import DbPlugin
from platypush.utils import run
from ._serialize import ProcedureEncoder
@dataclass
class _ProcedureWrapper:
name: str
obj: Union[dict, Callable]
class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager):
"""
Utility plugin to run and store procedures as native entities.
"""
@action
def exec(self, procedure: str, *args, **kwargs):
return run(f'procedure.{procedure}', *args, **kwargs)
def _convert_procedure(self, name: str, proc: Union[dict, Callable]) -> Procedure:
metadata = self._serialize_procedure(proc, name=name)
return Procedure(
id=name,
name=name,
plugin=self,
procedure_type=metadata['type'],
module=metadata.get('module'),
source=metadata.get('source'),
line=metadata.get('line'),
args=metadata.get('args', []),
actions=metadata.get('actions', []),
)
@action
def status(self, *_, **__):
"""
:return: The serialized configured procedures. Format:
.. code-block:: json
{
"procedure_name": {
"type": "python",
"module": "module_name",
"source": "/path/to/source.py",
"line": 42,
"args": ["arg1", "arg2"]
}
}
"""
self.publish_entities(self._get_wrapped_procedures())
return self._get_serialized_procedures()
def transform_entities(
self, entities: Collection[_ProcedureWrapper], **_
) -> Collection[Procedure]:
return [
self._convert_procedure(name=proc.name, proc=proc.obj) for proc in entities
]
def _get_wrapped_procedures(self) -> Collection[_ProcedureWrapper]:
return [
_ProcedureWrapper(name=name, obj=proc)
for name, proc in self._all_procedures.items()
]
def _sync_db_procedures(self):
cur_proc_names = set(self._all_procedures.keys())
db: Optional[DbPlugin] = get_plugin(DbPlugin)
assert db, 'No database plugin configured'
with db.get_session(
autoflush=False, autocommit=False, expire_on_commit=False
) as session:
procs_to_remove = (
session.query(Procedure)
.filter(Procedure.name.not_in(cur_proc_names))
.all()
)
for proc in procs_to_remove:
self.logger.info('Removing stale procedure record for %s', proc.name)
session.delete(proc)
@staticmethod
def _serialize_procedure(
proc: Union[dict, Callable], name: Optional[str] = None
) -> dict:
ret = json.loads(json.dumps(proc, cls=ProcedureEncoder))
if name:
ret['name'] = name
return ret
def _get_serialized_procedures(self) -> dict:
return {
name: self._serialize_procedure(proc, name=name)
for name, proc in self._all_procedures.items()
}
def main(self, *_, **__):
self._sync_db_procedures()
while not self.should_stop():
self.publish_entities(self._get_wrapped_procedures())
self.wait_stop()

View file

@ -10,9 +10,10 @@ class ProcedureEncoder(json.JSONEncoder):
def default(self, o): def default(self, o):
if callable(o): if callable(o):
return { return {
'type': 'native_function', 'type': 'python',
'module': o.__module__, 'module': o.__module__,
'source': inspect.getsourcefile(o), 'source': getattr(o, "_source", inspect.getsourcefile(o)),
'line': getattr(o, "_line", inspect.getsourcelines(o)[1]),
'args': [ 'args': [
name name
for name, arg in inspect.signature(o).parameters.items() for name, arg in inspect.signature(o).parameters.items()

View file

@ -0,0 +1,7 @@
{
"manifest": {
"package": "platypush.plugins.procedure",
"type": "plugin",
"events": []
}
}

View file

@ -565,9 +565,12 @@ def procedure(name_or_func: Optional[str] = None, *upper_args, **upper_kwargs):
""" """
Public decorator to mark a function as a procedure. Public decorator to mark a function as a procedure.
""" """
import inspect
f.procedure = True f.procedure = True
f.procedure_name = name f.procedure_name = name
f._source = inspect.getsourcefile(f) # pylint: disable=protected-access
f._line = inspect.getsourcelines(f)[1] # pylint: disable=protected-access
@wraps(f) @wraps(f)
def _execute_procedure(*args, **kwargs): def _execute_procedure(*args, **kwargs):