Support procedures as native entities #425

Merged
blacklight merged 7 commits from 341/procedure-entities into master 2024-08-28 00:17:59 +02:00
9 changed files with 213 additions and 3 deletions

View file

@ -272,6 +272,7 @@ class Config:
@property
def _core_plugins(self) -> Dict[str, dict]:
return {
'procedures': {},
'variable': {},
}
@ -422,6 +423,7 @@ class Config:
self.procedures[procedure_name] = {
'_async': _async,
'type': 'config',
'actions': component,
'args': args,
}

View file

@ -0,0 +1,29 @@
from abc import ABC, abstractmethod
from typing import Callable, Dict, Union
from platypush.config import Config
from . import EntityManager
class ProcedureEntityManager(EntityManager, ABC):
"""
Base class for integrations that can run and manage procedures.
"""
@abstractmethod
def exec(self, procedure: str, *args, **kwargs):
"""
Run a procedure.
:param procedure: Procedure to run, by name.
:param args: Arguments to pass to the procedure.
:param kwargs: Keyword arguments to pass to the procedure.
"""
raise NotImplementedError()
@property
def _all_procedures(self) -> Dict[str, Union[dict, Callable]]:
"""
:return: All the procedures that can be run by this entity manager.
"""
return Config.get_procedures()

View file

@ -0,0 +1,43 @@
import logging
from sqlalchemy import (
Column,
Enum,
ForeignKey,
Integer,
JSON,
String,
)
from platypush.common.db import is_defined
from . import Entity
logger = logging.getLogger(__name__)
if not is_defined('procedure'):
class Procedure(Entity):
"""
Models a procedure entity.
"""
__tablename__ = 'procedure'
id = Column(
Integer, ForeignKey('entity.id', ondelete='CASCADE'), primary_key=True
)
args = Column(JSON, nullable=False, default=[])
procedure_type = Column(
Enum('python', 'config', name='procedure_type'), nullable=False
)
module = Column(String)
source = Column(String)
line = Column(Integer)
actions = Column(JSON)
__table_args__ = {'keep_existing': True}
__mapper_args__ = {
'polymorphic_identity': __tablename__,
}

View file

@ -1,4 +1,5 @@
import json
import os
from platypush import Config
from platypush.message import Message
@ -66,5 +67,12 @@ class ConfigPlugin(Plugin):
"""
return Config._instance.config_file
@action
def get_config_dir(self) -> str:
"""
:return: The path to the configuration directory.
"""
return os.path.dirname(Config._instance.config_file)
# vim:sw=4:ts=4:et:

View file

@ -12,6 +12,7 @@ from platypush.common.db import override_definitions
from platypush.common.reflection import Integration, Message as MessageMetadata
from platypush.config import Config
from platypush.plugins import Plugin, action
from platypush.plugins.procedures import ProcedureEncoder
from platypush.message import Message
from platypush.message.event import Event
from platypush.message.response import Response
@ -20,7 +21,6 @@ from platypush.utils.mock import auto_mocks
from platypush.utils.manifest import Manifest, Manifests, PackageManagers
from ._cache import Cache
from ._serialize import ProcedureEncoder
class InspectPlugin(Plugin):

View file

@ -0,0 +1,117 @@
import json
from dataclasses import dataclass
from typing import Callable, Collection, Optional, Union
from platypush.context import get_plugin
from platypush.entities.managers.procedures import ProcedureEntityManager
from platypush.entities.procedures import Procedure
from platypush.plugins import RunnablePlugin, action
from platypush.plugins.db import DbPlugin
from platypush.utils import run
from ._serialize import ProcedureEncoder
@dataclass
class _ProcedureWrapper:
name: str
obj: Union[dict, Callable]
class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager):
"""
Utility plugin to run and store procedures as native entities.
"""
@action
def exec(self, procedure: str, *args, **kwargs):
return run(f'procedure.{procedure}', *args, **kwargs)
def _convert_procedure(self, name: str, proc: Union[dict, Callable]) -> Procedure:
metadata = self._serialize_procedure(proc, name=name)
return Procedure(
id=name,
name=name,
plugin=self,
procedure_type=metadata['type'],
module=metadata.get('module'),
source=metadata.get('source'),
line=metadata.get('line'),
args=metadata.get('args', []),
actions=metadata.get('actions', []),
)
@action
def status(self, *_, **__):
"""
:return: The serialized configured procedures. Format:
.. code-block:: json
{
"procedure_name": {
"type": "python",
"module": "module_name",
"source": "/path/to/source.py",
"line": 42,
"args": ["arg1", "arg2"]
}
}
"""
self.publish_entities(self._get_wrapped_procedures())
return self._get_serialized_procedures()
def transform_entities(
self, entities: Collection[_ProcedureWrapper], **_
) -> Collection[Procedure]:
return [
self._convert_procedure(name=proc.name, proc=proc.obj) for proc in entities
]
def _get_wrapped_procedures(self) -> Collection[_ProcedureWrapper]:
return [
_ProcedureWrapper(name=name, obj=proc)
for name, proc in self._all_procedures.items()
]
def _sync_db_procedures(self):
cur_proc_names = set(self._all_procedures.keys())
db: Optional[DbPlugin] = get_plugin(DbPlugin)
assert db, 'No database plugin configured'
with db.get_session(
autoflush=False, autocommit=False, expire_on_commit=False
) as session:
procs_to_remove = (
session.query(Procedure)
.filter(Procedure.name.not_in(cur_proc_names))
.all()
)
for proc in procs_to_remove:
self.logger.info('Removing stale procedure record for %s', proc.name)
session.delete(proc)
@staticmethod
def _serialize_procedure(
proc: Union[dict, Callable], name: Optional[str] = None
) -> dict:
ret = json.loads(json.dumps(proc, cls=ProcedureEncoder))
if name:
ret['name'] = name
return ret
def _get_serialized_procedures(self) -> dict:
return {
name: self._serialize_procedure(proc, name=name)
for name, proc in self._all_procedures.items()
}
def main(self, *_, **__):
self._sync_db_procedures()
while not self.should_stop():
self.publish_entities(self._get_wrapped_procedures())
self.wait_stop()

View file

@ -10,9 +10,10 @@ class ProcedureEncoder(json.JSONEncoder):
def default(self, o):
if callable(o):
return {
'type': 'native_function',
'type': 'python',
'module': o.__module__,
'source': inspect.getsourcefile(o),
'source': getattr(o, "_source", inspect.getsourcefile(o)),
'line': getattr(o, "_line", inspect.getsourcelines(o)[1]),
'args': [
name
for name, arg in inspect.signature(o).parameters.items()

View file

@ -0,0 +1,7 @@
{
"manifest": {
"package": "platypush.plugins.procedure",
"type": "plugin",
"events": []
}
}

View file

@ -565,9 +565,12 @@ def procedure(name_or_func: Optional[str] = None, *upper_args, **upper_kwargs):
"""
Public decorator to mark a function as a procedure.
"""
import inspect
f.procedure = True
f.procedure_name = name
f._source = inspect.getsourcefile(f) # pylint: disable=protected-access
f._line = inspect.getsourcelines(f)[1] # pylint: disable=protected-access
@wraps(f)
def _execute_procedure(*args, **kwargs):