From af21ff13ff963c2454e79cc2f80fc449eb4bdd76 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 13 Aug 2024 22:27:10 +0200 Subject: [PATCH 01/36] [WIP] --- platypush/config/__init__.py | 1 + platypush/entities/_engine/_procedure.py | 7 +++++ platypush/entities/procedures.py | 38 ++++++++++++++++++++++++ platypush/plugins/inspect/_serialize.py | 5 ++-- platypush/procedure/__init__.py | 3 ++ 5 files changed, 52 insertions(+), 2 deletions(-) create mode 100644 platypush/entities/_engine/_procedure.py create mode 100644 platypush/entities/procedures.py diff --git a/platypush/config/__init__.py b/platypush/config/__init__.py index c6859e02bb..244c799af4 100644 --- a/platypush/config/__init__.py +++ b/platypush/config/__init__.py @@ -422,6 +422,7 @@ class Config: self.procedures[procedure_name] = { '_async': _async, + 'type': 'config', 'actions': component, 'args': args, } diff --git a/platypush/entities/_engine/_procedure.py b/platypush/entities/_engine/_procedure.py new file mode 100644 index 0000000000..74b566f236 --- /dev/null +++ b/platypush/entities/_engine/_procedure.py @@ -0,0 +1,7 @@ +class ProceduresManager: + """ + This class is responsible for managing the procedures as native entities. + """ + + def __init__(self): + self.procedures = {} diff --git a/platypush/entities/procedures.py b/platypush/entities/procedures.py new file mode 100644 index 0000000000..8cdebba4eb --- /dev/null +++ b/platypush/entities/procedures.py @@ -0,0 +1,38 @@ +import logging + +from sqlalchemy import ( + Column, + Enum, + ForeignKey, + Integer, + JSON, + String, +) + +from platypush.common.db import is_defined + +from . import Entity + +logger = logging.getLogger(__name__) + + +if not is_defined('procedure'): + + class Procedure(Entity): + """ + Models a procedure entity. + """ + + __tablename__ = 'procedure' + + id = Column( + Integer, ForeignKey('entity.id', ondelete='CASCADE'), primary_key=True + ) + name = Column(String, unique=True, nullable=False) + args = Column(JSON, nullable=False, default=[]) + type = Column(Enum('python', 'config', name='procedure_type'), nullable=False) + + __table_args__ = {'keep_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } diff --git a/platypush/plugins/inspect/_serialize.py b/platypush/plugins/inspect/_serialize.py index a9b75258b8..aa9a770316 100644 --- a/platypush/plugins/inspect/_serialize.py +++ b/platypush/plugins/inspect/_serialize.py @@ -10,9 +10,10 @@ class ProcedureEncoder(json.JSONEncoder): def default(self, o): if callable(o): return { - 'type': 'native_function', + 'type': 'python', 'module': o.__module__, - 'source': inspect.getsourcefile(o), + 'source': getattr(o, "_source", inspect.getsourcefile(o)), + 'line': getattr(o, "_line", inspect.getsourcelines(o)[1]), 'args': [ name for name, arg in inspect.signature(o).parameters.items() diff --git a/platypush/procedure/__init__.py b/platypush/procedure/__init__.py index 478b3a64b6..56cdc8f130 100644 --- a/platypush/procedure/__init__.py +++ b/platypush/procedure/__init__.py @@ -565,9 +565,12 @@ def procedure(name_or_func: Optional[str] = None, *upper_args, **upper_kwargs): """ Public decorator to mark a function as a procedure. """ + import inspect f.procedure = True f.procedure_name = name + f._source = inspect.getsourcefile(f) # pylint: disable=protected-access + f._line = inspect.getsourcelines(f)[1] # pylint: disable=protected-access @wraps(f) def _execute_procedure(*args, **kwargs): From 06781cd72c8d19f0442a1192516278135285372c Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 25 Aug 2024 16:06:56 +0200 Subject: [PATCH 02/36] [#341] Backend implementation of the new `procedure` entities architecture. --- platypush/config/__init__.py | 1 + platypush/entities/_engine/_procedure.py | 7 -- platypush/entities/managers/procedures.py | 29 ++++++ platypush/entities/procedures.py | 8 +- platypush/plugins/inspect/__init__.py | 2 +- platypush/plugins/procedures/__init__.py | 94 +++++++++++++++++++ .../{inspect => procedures}/_serialize.py | 0 platypush/plugins/procedures/manifest.json | 7 ++ 8 files changed, 138 insertions(+), 10 deletions(-) delete mode 100644 platypush/entities/_engine/_procedure.py create mode 100644 platypush/entities/managers/procedures.py create mode 100644 platypush/plugins/procedures/__init__.py rename platypush/plugins/{inspect => procedures}/_serialize.py (100%) create mode 100644 platypush/plugins/procedures/manifest.json diff --git a/platypush/config/__init__.py b/platypush/config/__init__.py index 244c799af4..945a47af85 100644 --- a/platypush/config/__init__.py +++ b/platypush/config/__init__.py @@ -272,6 +272,7 @@ class Config: @property def _core_plugins(self) -> Dict[str, dict]: return { + 'procedures': {}, 'variable': {}, } diff --git a/platypush/entities/_engine/_procedure.py b/platypush/entities/_engine/_procedure.py deleted file mode 100644 index 74b566f236..0000000000 --- a/platypush/entities/_engine/_procedure.py +++ /dev/null @@ -1,7 +0,0 @@ -class ProceduresManager: - """ - This class is responsible for managing the procedures as native entities. - """ - - def __init__(self): - self.procedures = {} diff --git a/platypush/entities/managers/procedures.py b/platypush/entities/managers/procedures.py new file mode 100644 index 0000000000..50335c9a47 --- /dev/null +++ b/platypush/entities/managers/procedures.py @@ -0,0 +1,29 @@ +from abc import ABC, abstractmethod +from typing import Callable, Dict, Union + +from platypush.config import Config +from . import EntityManager + + +class ProcedureEntityManager(EntityManager, ABC): + """ + Base class for integrations that can run and manage procedures. + """ + + @abstractmethod + def exec(self, procedure: str, *args, **kwargs): + """ + Run a procedure. + + :param procedure: Procedure to run, by name. + :param args: Arguments to pass to the procedure. + :param kwargs: Keyword arguments to pass to the procedure. + """ + raise NotImplementedError() + + @property + def _all_procedures(self) -> Dict[str, Union[dict, Callable]]: + """ + :return: All the procedures that can be run by this entity manager. + """ + return Config.get_procedures() diff --git a/platypush/entities/procedures.py b/platypush/entities/procedures.py index 8cdebba4eb..3609542fef 100644 --- a/platypush/entities/procedures.py +++ b/platypush/entities/procedures.py @@ -28,9 +28,13 @@ if not is_defined('procedure'): id = Column( Integer, ForeignKey('entity.id', ondelete='CASCADE'), primary_key=True ) - name = Column(String, unique=True, nullable=False) args = Column(JSON, nullable=False, default=[]) - type = Column(Enum('python', 'config', name='procedure_type'), nullable=False) + procedure_type = Column( + Enum('python', 'config', name='procedure_type'), nullable=False + ) + module = Column(String) + source = Column(String) + line = Column(Integer) __table_args__ = {'keep_existing': True} __mapper_args__ = { diff --git a/platypush/plugins/inspect/__init__.py b/platypush/plugins/inspect/__init__.py index 1449aab343..15667f3f25 100644 --- a/platypush/plugins/inspect/__init__.py +++ b/platypush/plugins/inspect/__init__.py @@ -12,6 +12,7 @@ from platypush.common.db import override_definitions from platypush.common.reflection import Integration, Message as MessageMetadata from platypush.config import Config from platypush.plugins import Plugin, action +from platypush.plugins.procedure import ProcedureEncoder from platypush.message import Message from platypush.message.event import Event from platypush.message.response import Response @@ -20,7 +21,6 @@ from platypush.utils.mock import auto_mocks from platypush.utils.manifest import Manifest, Manifests, PackageManagers from ._cache import Cache -from ._serialize import ProcedureEncoder class InspectPlugin(Plugin): diff --git a/platypush/plugins/procedures/__init__.py b/platypush/plugins/procedures/__init__.py new file mode 100644 index 0000000000..5c5813e724 --- /dev/null +++ b/platypush/plugins/procedures/__init__.py @@ -0,0 +1,94 @@ +import json +from dataclasses import dataclass +from typing import Callable, Collection, Optional, Union + +from platypush.entities.managers.procedures import ProcedureEntityManager +from platypush.entities.procedures import Procedure +from platypush.plugins import RunnablePlugin, action +from platypush.utils import run + +from ._serialize import ProcedureEncoder + + +@dataclass +class _ProcedureWrapper: + name: str + obj: Union[dict, Callable] + + +class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): + """ + Utility plugin to run and store procedures as native entities. + """ + + @action + def exec(self, procedure: str, *args, **kwargs): + return run(f'procedure.{procedure}', *args, **kwargs) + + def _convert_procedure(self, name: str, proc: Union[dict, Callable]) -> Procedure: + metadata = self._serialize_procedure(proc, name=name) + return Procedure( + id=name, + name=name, + plugin=self, + procedure_type=metadata['type'], + module=metadata.get('module'), + source=metadata.get('source'), + line=metadata.get('line'), + args=metadata.get('args', []), + ) + + @action + def status(self, *_, **__): + """ + :return: The serialized configured procedures. Format: + + .. code-block:: json + + { + "procedure_name": { + "type": "python", + "module": "module_name", + "source": "/path/to/source.py", + "line": 42, + "args": ["arg1", "arg2"] + } + } + + """ + self.publish_entities(self._get_wrapped_procedures()) + return self._get_serialized_procedures() + + def transform_entities( + self, entities: Collection[_ProcedureWrapper], **_ + ) -> Collection[Procedure]: + return [ + self._convert_procedure(name=proc.name, proc=proc.obj) for proc in entities + ] + + def _get_wrapped_procedures(self) -> Collection[_ProcedureWrapper]: + return [ + _ProcedureWrapper(name=name, obj=proc) + for name, proc in self._all_procedures.items() + ] + + @staticmethod + def _serialize_procedure( + proc: Union[dict, Callable], name: Optional[str] = None + ) -> dict: + ret = json.loads(json.dumps(proc, cls=ProcedureEncoder)) + if name: + ret['name'] = name + + return ret + + def _get_serialized_procedures(self) -> dict: + return { + name: self._serialize_procedure(proc, name=name) + for name, proc in self._all_procedures.items() + } + + def main(self, *_, **__): + while not self.should_stop(): + self.publish_entities(self._get_wrapped_procedures()) + self.wait_stop() diff --git a/platypush/plugins/inspect/_serialize.py b/platypush/plugins/procedures/_serialize.py similarity index 100% rename from platypush/plugins/inspect/_serialize.py rename to platypush/plugins/procedures/_serialize.py diff --git a/platypush/plugins/procedures/manifest.json b/platypush/plugins/procedures/manifest.json new file mode 100644 index 0000000000..b489ae96d2 --- /dev/null +++ b/platypush/plugins/procedures/manifest.json @@ -0,0 +1,7 @@ +{ + "manifest": { + "package": "platypush.plugins.procedure", + "type": "plugin", + "events": [] + } +} From 0f186c44ef1baaa5f48f7e5725a68080ef009a14 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 25 Aug 2024 23:45:11 +0200 Subject: [PATCH 03/36] [#341] Support for procedure reconciliation. If some procedures are removed either from the configuration or from the loaded scripts, then their associated entities should also be removed from the database when the `procedures` plugin is loaded. --- platypush/plugins/procedures/__init__.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/platypush/plugins/procedures/__init__.py b/platypush/plugins/procedures/__init__.py index 5c5813e724..dfb98042bd 100644 --- a/platypush/plugins/procedures/__init__.py +++ b/platypush/plugins/procedures/__init__.py @@ -2,9 +2,11 @@ import json from dataclasses import dataclass from typing import Callable, Collection, Optional, Union +from platypush.context import get_plugin from platypush.entities.managers.procedures import ProcedureEntityManager from platypush.entities.procedures import Procedure from platypush.plugins import RunnablePlugin, action +from platypush.plugins.db import DbPlugin from platypush.utils import run from ._serialize import ProcedureEncoder @@ -72,6 +74,24 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): for name, proc in self._all_procedures.items() ] + def _sync_db_procedures(self): + cur_proc_names = set(self._all_procedures.keys()) + db: Optional[DbPlugin] = get_plugin(DbPlugin) + assert db, 'No database plugin configured' + + with db.get_session( + autoflush=False, autocommit=False, expire_on_commit=False + ) as session: + procs_to_remove = ( + session.query(Procedure) + .filter(Procedure.name.not_in(cur_proc_names)) + .all() + ) + + for proc in procs_to_remove: + self.logger.info('Removing stale procedure record for %s', proc.name) + session.delete(proc) + @staticmethod def _serialize_procedure( proc: Union[dict, Callable], name: Optional[str] = None @@ -89,6 +109,8 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): } def main(self, *_, **__): + self._sync_db_procedures() + while not self.should_stop(): self.publish_entities(self._get_wrapped_procedures()) self.wait_stop() From ffc3fe218d33e0cc4587082cd152039d4651851f Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 28 Aug 2024 00:13:46 +0200 Subject: [PATCH 04/36] [config] Added `config.get_config_dir` method. --- platypush/plugins/config/__init__.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/platypush/plugins/config/__init__.py b/platypush/plugins/config/__init__.py index 2959863f98..0624e6d0b5 100644 --- a/platypush/plugins/config/__init__.py +++ b/platypush/plugins/config/__init__.py @@ -1,4 +1,5 @@ import json +import os from platypush import Config from platypush.message import Message @@ -66,5 +67,12 @@ class ConfigPlugin(Plugin): """ return Config._instance.config_file + @action + def get_config_dir(self) -> str: + """ + :return: The path to the configuration directory. + """ + return os.path.dirname(Config._instance.config_file) + # vim:sw=4:ts=4:et: From 234963b0695127f0f767388e8e178e9879f0ea96 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 28 Aug 2024 00:14:21 +0200 Subject: [PATCH 05/36] =?UTF-8?q?=F0=9F=90=9B=20Fixed=20import=20error?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- platypush/plugins/inspect/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/platypush/plugins/inspect/__init__.py b/platypush/plugins/inspect/__init__.py index 15667f3f25..8fafeadd84 100644 --- a/platypush/plugins/inspect/__init__.py +++ b/platypush/plugins/inspect/__init__.py @@ -12,7 +12,7 @@ from platypush.common.db import override_definitions from platypush.common.reflection import Integration, Message as MessageMetadata from platypush.config import Config from platypush.plugins import Plugin, action -from platypush.plugins.procedure import ProcedureEncoder +from platypush.plugins.procedures import ProcedureEncoder from platypush.message import Message from platypush.message.event import Event from platypush.message.response import Response From d9916873cbe5379408fb715f8e11391b399e10ae Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 28 Aug 2024 00:14:54 +0200 Subject: [PATCH 06/36] [procedures] Store `actions` for YAML procedures. --- platypush/entities/procedures.py | 1 + platypush/plugins/procedures/__init__.py | 1 + 2 files changed, 2 insertions(+) diff --git a/platypush/entities/procedures.py b/platypush/entities/procedures.py index 3609542fef..eef3427bbc 100644 --- a/platypush/entities/procedures.py +++ b/platypush/entities/procedures.py @@ -35,6 +35,7 @@ if not is_defined('procedure'): module = Column(String) source = Column(String) line = Column(Integer) + actions = Column(JSON) __table_args__ = {'keep_existing': True} __mapper_args__ = { diff --git a/platypush/plugins/procedures/__init__.py b/platypush/plugins/procedures/__init__.py index dfb98042bd..d8892e60c3 100644 --- a/platypush/plugins/procedures/__init__.py +++ b/platypush/plugins/procedures/__init__.py @@ -38,6 +38,7 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): source=metadata.get('source'), line=metadata.get('line'), args=metadata.get('args', []), + actions=metadata.get('actions', []), ) @action From e593264eab2aa330e17b5d1082e8a6452b0a821f Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 30 Aug 2024 02:05:02 +0200 Subject: [PATCH 07/36] [#341] Added `ProcedureType` enum. --- platypush/entities/procedures.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/platypush/entities/procedures.py b/platypush/entities/procedures.py index eef3427bbc..14c6ad14d3 100644 --- a/platypush/entities/procedures.py +++ b/platypush/entities/procedures.py @@ -1,8 +1,9 @@ import logging +from enum import Enum from sqlalchemy import ( Column, - Enum, + Enum as DbEnum, ForeignKey, Integer, JSON, @@ -16,6 +17,12 @@ from . import Entity logger = logging.getLogger(__name__) +class ProcedureType(Enum): + PYTHON = 'python' + CONFIG = 'config' + DB = 'db' + + if not is_defined('procedure'): class Procedure(Entity): @@ -30,7 +37,13 @@ if not is_defined('procedure'): ) args = Column(JSON, nullable=False, default=[]) procedure_type = Column( - Enum('python', 'config', name='procedure_type'), nullable=False + DbEnum( + *[m.value for m in ProcedureType.__members__.values()], + name='procedure_type', + create_constraint=True, + validate_strings=True, + ), + nullable=False, ) module = Column(String) source = Column(String) From 457333929f1a527da9b3b39d3082ff81a64bedd4 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 30 Aug 2024 02:08:42 +0200 Subject: [PATCH 08/36] [#341] Added `procedures.save` and `procedures.delete` actions. --- platypush/message/request/__init__.py | 2 +- platypush/plugins/procedures/__init__.py | 194 +++++++++++++++++++-- platypush/plugins/procedures/_serialize.py | 5 + 3 files changed, 188 insertions(+), 13 deletions(-) diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index b470b3dc41..9e4c20db25 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -100,7 +100,7 @@ class Request(Message): proc = Procedure.build( name=proc_name, requests=proc_config['actions'], - _async=proc_config['_async'], + _async=proc_config.get('_async', False), args=self.args, backend=self.backend, id=self.id, diff --git a/platypush/plugins/procedures/__init__.py b/platypush/plugins/procedures/__init__.py index d8892e60c3..1b82befcdb 100644 --- a/platypush/plugins/procedures/__init__.py +++ b/platypush/plugins/procedures/__init__.py @@ -1,10 +1,13 @@ +from contextlib import contextmanager import json from dataclasses import dataclass -from typing import Callable, Collection, Optional, Union +from typing import Callable, Collection, Generator, Iterable, Optional, Union + +from sqlalchemy.orm import Session from platypush.context import get_plugin from platypush.entities.managers.procedures import ProcedureEntityManager -from platypush.entities.procedures import Procedure +from platypush.entities.procedures import Procedure, ProcedureType from platypush.plugins import RunnablePlugin, action from platypush.plugins.db import DbPlugin from platypush.utils import run @@ -62,6 +65,158 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): self.publish_entities(self._get_wrapped_procedures()) return self._get_serialized_procedures() + def _update_procedure(self, old: Procedure, new: Procedure, session: Session): + assert old.procedure_type == ProcedureType.DB.value, ( # type: ignore[attr-defined] + f'Procedure {old.name} is not stored in the database, ' + f'it should be removed from the source file: {old.source}' + ) + + old.external_id = new.external_id + old.name = new.name + old.args = new.args + old.actions = new.actions + session.add(old) + + @action + def save( + self, + name: str, + actions: Iterable[dict], + args: Optional[Iterable[str]] = None, + old_name: Optional[str] = None, + **_, + ): + """ + Save a procedure. + + :param name: Name of the procedure. + :param actions: Definition of the actions to be executed. Format: + + .. code-block:: json + + [ + { + "action": "logger.info", + "args": { + "msg": "Hello, world!" + } + } + ] + + :param args: Optional list of arguments to be passed to the procedure, + as a list of strings with the argument names. + :param old_name: Optional old name of the procedure if it's being + renamed. + """ + assert name, 'Procedure name cannot be empty' + assert actions, 'Procedure actions cannot be empty' + assert all( + isinstance(action, dict) and action.get('action') for action in actions + ), 'Procedure actions should be dictionaries with an "action" key' + + args = args or [] + proc_def = { + 'type': ProcedureType.DB.value, + 'name': name, + 'actions': actions, + 'args': args, + } + + existing_proc = None + old_proc = None + new_proc = Procedure( + external_id=name, + plugin=str(self), + procedure_type=ProcedureType.DB.value, + name=name, + actions=actions, + args=args, + ) + + with self._db_session() as session: + if old_name and old_name != name: + old_proc = ( + session.query(Procedure).filter(Procedure.name == old_name).first() + ) + + if old_proc: + self._update_procedure(old=old_proc, new=new_proc, session=session) + else: + self.logger.warning( + 'Procedure %s not found, skipping rename', old_name + ) + + existing_proc = ( + session.query(Procedure).filter(Procedure.name == name).first() + ) + + if existing_proc: + if old_proc: + self._delete(str(existing_proc.name), session=session) + else: + self._update_procedure( + old=existing_proc, new=new_proc, session=session + ) + elif not old_proc: + session.add(new_proc) + + if old_proc: + old_name = str(old_proc.name) + self._all_procedures.pop(old_name, None) + + self._all_procedures[name] = { + **self._all_procedures.get(name, {}), # type: ignore[operator] + **proc_def, + } + + self.status() + + @action + def delete(self, name: str): + """ + Delete a procedure by name. + + Note that this is only possible for procedures that are stored on the + database. Procedures that are loaded from Python scripts or + configuration files should be removed from the source file. + + :param name: Name of the procedure to be deleted. + """ + with self._db_session() as session: + self._delete(name, session=session) + + self.status() + + @contextmanager + def _db_session(self) -> Generator[Session, None, None]: + db: Optional[DbPlugin] = get_plugin(DbPlugin) + assert db, 'No database plugin configured' + with db.get_session( + autoflush=False, autocommit=False, expire_on_commit=False + ) as session: + assert isinstance(session, Session) + yield session + + if session.is_active: + session.commit() + else: + session.rollback() + + def _delete(self, name: str, session: Session): + assert name, 'Procedure name cannot be empty' + proc_row: Procedure = ( + session.query(Procedure).filter(Procedure.name == name).first() + ) + + assert proc_row, f'Procedure {name} not found in the database' + assert proc_row.procedure_type == ProcedureType.DB.value, ( # type: ignore[attr-defined] + f'Procedure {name} is not stored in the database, ' + f'it should be removed from the source file' + ) + + session.delete(proc_row) + self._all_procedures.pop(name, None) + def transform_entities( self, entities: Collection[_ProcedureWrapper], **_ ) -> Collection[Procedure]: @@ -77,22 +232,37 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): def _sync_db_procedures(self): cur_proc_names = set(self._all_procedures.keys()) - db: Optional[DbPlugin] = get_plugin(DbPlugin) - assert db, 'No database plugin configured' + with self._db_session() as session: + saved_procs = { + str(proc.name): proc for proc in session.query(Procedure).all() + } - with db.get_session( - autoflush=False, autocommit=False, expire_on_commit=False - ) as session: - procs_to_remove = ( - session.query(Procedure) - .filter(Procedure.name.not_in(cur_proc_names)) - .all() - ) + procs_to_remove = [ + proc + for name, proc in saved_procs.items() + if name not in cur_proc_names + and proc.procedure_type != ProcedureType.DB.value # type: ignore[attr-defined] + ] for proc in procs_to_remove: self.logger.info('Removing stale procedure record for %s', proc.name) session.delete(proc) + procs_to_add = [ + proc + for name, proc in saved_procs.items() + if proc.procedure_type == ProcedureType.DB.value # type: ignore[attr-defined] + and name not in cur_proc_names + ] + + for proc in procs_to_add: + self._all_procedures[str(proc.name)] = { + 'type': proc.procedure_type, + 'name': proc.name, + 'args': proc.args, + 'actions': proc.actions, + } + @staticmethod def _serialize_procedure( proc: Union[dict, Callable], name: Optional[str] = None diff --git a/platypush/plugins/procedures/_serialize.py b/platypush/plugins/procedures/_serialize.py index aa9a770316..5feb7f9f3e 100644 --- a/platypush/plugins/procedures/_serialize.py +++ b/platypush/plugins/procedures/_serialize.py @@ -8,6 +8,8 @@ class ProcedureEncoder(json.JSONEncoder): """ def default(self, o): + from platypush.entities.procedures import ProcedureType + if callable(o): return { 'type': 'python', @@ -21,4 +23,7 @@ class ProcedureEncoder(json.JSONEncoder): ], } + if isinstance(o, ProcedureType): + return o.value + return super().default(o) From 740e35bd5ea6f60156e525566ebef848403cb1fd Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sat, 31 Aug 2024 21:47:44 +0200 Subject: [PATCH 09/36] Moved full dump of requests+responses+events to log debug level. Messages can be quite big and verbose, and they can anyway be subscribed over Websockets. Full dumps are anyway enabled when Platypush is started in verbose mode. This commit replaces the dumps on INFO level with a quick summary containing the message ID, request/event type and response time. --- platypush/app/_app.py | 8 +++++++- platypush/message/__init__.py | 13 +++++++------ platypush/message/request/__init__.py | 18 ++++++++++++++++-- platypush/message/response/__init__.py | 8 ++++++-- 4 files changed, 36 insertions(+), 11 deletions(-) diff --git a/platypush/app/_app.py b/platypush/app/_app.py index 9bcffdba03..4f16e69c29 100644 --- a/platypush/app/_app.py +++ b/platypush/app/_app.py @@ -365,7 +365,13 @@ class Application: elif isinstance(msg, Response): msg.log() elif isinstance(msg, Event): - msg.log() + log.info( + 'Received event: %s.%s[id=%s]', + msg.__class__.__module__, + msg.__class__.__name__, + msg.id, + ) + msg.log(level=logging.DEBUG) self.event_processor.process_event(msg) return _f diff --git a/platypush/message/__init__.py b/platypush/message/__init__.py index 65106da2b4..69c9673f71 100644 --- a/platypush/message/__init__.py +++ b/platypush/message/__init__.py @@ -8,7 +8,7 @@ import logging import inspect import json import time -from typing import Union +from typing import Optional, Union from uuid import UUID _logger = logging.getLogger('platypush') @@ -114,18 +114,19 @@ class Message: self._logger = _logger self._default_log_prefix = '' - def log(self, prefix=''): + def log(self, level: Optional[int] = None, prefix=''): if self.logging_level is None: return # Skip logging log_func = self._logger.info - if self.logging_level == logging.DEBUG: + level = level if level is not None else self.logging_level + if level == logging.DEBUG: log_func = self._logger.debug - elif self.logging_level == logging.WARNING: + elif level == logging.WARNING: log_func = self._logger.warning - elif self.logging_level == logging.ERROR: + elif level == logging.ERROR: log_func = self._logger.error - elif self.logging_level == logging.FATAL: + elif level == logging.FATAL: log_func = self._logger.fatal if not prefix: diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 9e4c20db25..7a55e65429 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -165,6 +165,11 @@ class Request(Message): context_value = [*context_value] if isinstance(context_value, datetime.date): context_value = context_value.isoformat() + except NameError as e: + logger.warning( + 'Could not expand expression "%s": %s', inner_expr, e + ) + context_value = expr except Exception as e: logger.exception(e) context_value = expr @@ -188,7 +193,13 @@ class Request(Message): response.id = self.id response.target = self.origin response.origin = Config.get('device_id') - response.log() + self._logger.info( + 'Sending response to request[id=%s, action=%s], response_time=%.02fs', + self.id, + self.action, + response.timestamp - self.timestamp, + ) + response.log(level=logging.DEBUG) if self.backend and self.origin: self.backend.send_response(response=response, request=self) @@ -221,7 +232,10 @@ class Request(Message): from platypush.plugins import RunnablePlugin response = None - self.log() + self._logger.info( + 'Executing request[id=%s, action=%s]', self.id, self.action + ) + self.log(level=logging.DEBUG) try: if self.action.startswith('procedure.'): diff --git a/platypush/message/response/__init__.py b/platypush/message/response/__init__.py index f1db626e24..2a16a1cb5d 100644 --- a/platypush/message/response/__init__.py +++ b/platypush/message/response/__init__.py @@ -1,6 +1,7 @@ import json import logging import time +from typing import Optional from platypush.message import Message @@ -99,8 +100,11 @@ class Response(Message): return json.dumps(response_dict, cls=self.Encoder) - def log(self, *args, **kwargs): - self.logging_level = logging.WARNING if self.is_error() else logging.INFO + def log(self, *args, level: Optional[int] = None, **kwargs): + if level is None: + level = logging.WARNING if self.is_error() else logging.INFO + + kwargs['level'] = level super().log(*args, **kwargs) From a3eedc6adcb0b1c31e9fba9dcd66574aa6325a3d Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sat, 31 Aug 2024 21:55:19 +0200 Subject: [PATCH 10/36] [core] Fix support for custom SQLAlchemy engine options on `db` conf. Earlier any extra parameters passed to the `db` configuration other than `engine` where ignored. This enables engine-level configurations such as: ```yaml db: # Display all SQL queries echo: true ``` --- platypush/config/__init__.py | 2 ++ platypush/plugins/db/__init__.py | 23 ++++++++++++++++------- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/platypush/config/__init__.py b/platypush/config/__init__.py index 945a47af85..3eaa7ab5a6 100644 --- a/platypush/config/__init__.py +++ b/platypush/config/__init__.py @@ -179,6 +179,8 @@ class Config: self._config['logging'] = logging_config def _init_db(self, db: Optional[str] = None): + self._config['_db'] = self._config.get('db', {}) + # If the db connection string is passed as an argument, use it if db: self._config['db'] = { diff --git a/platypush/plugins/db/__init__.py b/platypush/plugins/db/__init__.py index 5f32258c20..e8ac877665 100644 --- a/platypush/plugins/db/__init__.py +++ b/platypush/plugins/db/__init__.py @@ -40,8 +40,13 @@ class DbPlugin(Plugin): (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) """ - super().__init__() - self.engine_url = engine + from platypush.config import Config + + kwargs.update(Config.get('_db', {})) + super().__init__(*args, **kwargs) + self.engine_url = engine or kwargs.pop('engine', None) + self.args = args + self.kwargs = kwargs self.engine = self.get_engine(engine, *args, **kwargs) def get_engine( @@ -50,6 +55,10 @@ class DbPlugin(Plugin): if engine == self.engine_url and self.engine: return self.engine + if not args: + args = self.args + kwargs = {**self.kwargs, **kwargs} + if engine or not self.engine: if isinstance(engine, Engine): return engine @@ -213,7 +222,7 @@ class DbPlugin(Plugin): query = text(query) if table: - table, engine = self._get_table(table, engine=engine, *args, **kwargs) + table, engine = self._get_table(table, *args, engine=engine, **kwargs) query = table.select() if filter: @@ -240,10 +249,10 @@ class DbPlugin(Plugin): self, table, records, + *args, engine=None, key_columns=None, on_duplicate_update=False, - *args, **kwargs, ): """ @@ -310,7 +319,7 @@ class DbPlugin(Plugin): key_columns = [] engine = self.get_engine(engine, *args, **kwargs) - table, engine = self._get_table(table, engine=engine, *args, **kwargs) + table, engine = self._get_table(table, *args, engine=engine, **kwargs) insert_records = records update_records = [] returned_records = [] @@ -454,7 +463,7 @@ class DbPlugin(Plugin): """ engine = self.get_engine(engine, *args, **kwargs) with engine.connect() as connection: - table, engine = self._get_table(table, engine=engine, *args, **kwargs) + table, engine = self._get_table(table, *args, engine=engine, **kwargs) return self._update(connection, table, records, key_columns) @action @@ -498,7 +507,7 @@ class DbPlugin(Plugin): with engine.connect() as connection: for record in records: - table_, engine = self._get_table(table, engine=engine, *args, **kwargs) + table_, engine = self._get_table(table, *args, engine=engine, **kwargs) delete = table_.delete() for k, v in record.items(): From 7cc7009d08fa4f83f00bd1e879ccd1dab9406610 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 1 Sep 2024 01:04:12 +0200 Subject: [PATCH 11/36] [db] Always run `PRAGMA foreign_keys = ON` on SQLite connections. This is the default behaviour on basically any other supported RDBMS. --- platypush/plugins/db/__init__.py | 8 +++++++- platypush/plugins/entities/__init__.py | 7 +------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/platypush/plugins/db/__init__.py b/platypush/plugins/db/__init__.py index e8ac877665..0e4b84a9a1 100644 --- a/platypush/plugins/db/__init__.py +++ b/platypush/plugins/db/__init__.py @@ -533,13 +533,19 @@ class DbPlugin(Plugin): with lock, engine.connect() as conn: session_maker = scoped_session( sessionmaker( - expire_on_commit=False, + expire_on_commit=kwargs.get('expire_on_commit', False), autoflush=autoflush, ) ) session_maker.configure(bind=conn) session = session_maker() + + if str(session.connection().engine.url).startswith('sqlite://'): + # SQLite requires foreign_keys to be explicitly enabled + # in order to proper manage cascade deletions + session.execute(text('PRAGMA foreign_keys = ON')) + yield session session.flush() diff --git a/platypush/plugins/entities/__init__.py b/platypush/plugins/entities/__init__.py index a21181d710..b3ef372f3f 100644 --- a/platypush/plugins/entities/__init__.py +++ b/platypush/plugins/entities/__init__.py @@ -4,7 +4,7 @@ from time import time from traceback import format_exception from typing import Optional, Any, Collection, Mapping -from sqlalchemy import or_, text +from sqlalchemy import or_ from sqlalchemy.orm import make_transient, Session from platypush.config import Config @@ -206,11 +206,6 @@ class EntitiesPlugin(Plugin): :return: The payload of the deleted entities. """ with self._get_session(locked=True) as session: - if str(session.connection().engine.url).startswith('sqlite://'): - # SQLite requires foreign_keys to be explicitly enabled - # in order to proper manage cascade deletions - session.execute(text('PRAGMA foreign_keys = ON')) - entities: Collection[Entity] = ( session.query(Entity).filter(Entity.id.in_(entities)).all() ) From 9d086a4a1096f36823ade290451c7e92628ad275 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 1 Sep 2024 01:34:58 +0200 Subject: [PATCH 12/36] [system] Don't use `is_defined` macro for system plugin entities. It seems to clash with something and cause plugin actions to return random `ImportError`. --- platypush/entities/system.py | 436 ++++++++++++--------------- platypush/plugins/system/__init__.py | 14 +- 2 files changed, 206 insertions(+), 244 deletions(-) diff --git a/platypush/entities/system.py b/platypush/entities/system.py index 0421399adb..0ed543185e 100644 --- a/platypush/entities/system.py +++ b/platypush/entities/system.py @@ -1,289 +1,249 @@ from sqlalchemy import Boolean, Column, Float, ForeignKey, Integer, JSON, String -from platypush.common.db import is_defined - from . import Entity from .devices import Device from .sensors import NumericSensor, PercentSensor from .temperature import TemperatureSensor -if not is_defined('cpu'): +class Cpu(Entity): + """ + ``CPU`` ORM (container) model. + """ - class Cpu(Entity): - """ - ``CPU`` ORM (container) model. - """ + __tablename__ = 'cpu' - __tablename__ = 'cpu' + id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True) - id = Column( - Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True - ) + percent = Column(Float) - percent = Column(Float) + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } +class CpuInfo(Entity): + """ + ``CpuInfo`` ORM model. + """ -if not is_defined('cpu_info'): + __tablename__ = 'cpu_info' - class CpuInfo(Entity): - """ - ``CpuInfo`` ORM model. - """ + id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True) - __tablename__ = 'cpu_info' + architecture = Column(String) + bits = Column(Integer) + cores = Column(Integer) + vendor = Column(String) + brand = Column(String) + frequency_advertised = Column(Integer) + frequency_actual = Column(Integer) + flags = Column(JSON) + l1_instruction_cache_size = Column(Integer) + l1_data_cache_size = Column(Integer) + l2_cache_size = Column(Integer) + l3_cache_size = Column(Integer) - id = Column( - Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True - ) + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } - architecture = Column(String) - bits = Column(Integer) - cores = Column(Integer) - vendor = Column(String) - brand = Column(String) - frequency_advertised = Column(Integer) - frequency_actual = Column(Integer) - flags = Column(JSON) - l1_instruction_cache_size = Column(Integer) - l1_data_cache_size = Column(Integer) - l2_cache_size = Column(Integer) - l3_cache_size = Column(Integer) - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } +class CpuTimes(Entity): + """ + ``CpuTimes`` ORM (container) model. + """ + __tablename__ = 'cpu_times' -if not is_defined('cpu_times'): + id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True) - class CpuTimes(Entity): - """ - ``CpuTimes`` ORM (container) model. - """ + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } - __tablename__ = 'cpu_times' - id = Column( - Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True - ) +class CpuStats(Entity): + """ + ``CpuStats`` ORM (container) model. + """ - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } + __tablename__ = 'cpu_stats' + id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True) -if not is_defined('cpu_stats'): + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } - class CpuStats(Entity): - """ - ``CpuStats`` ORM (container) model. - """ - __tablename__ = 'cpu_stats' +class MemoryStats(Entity): + """ + ``MemoryStats`` ORM model. + """ - id = Column( - Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True - ) + __tablename__ = 'memory_stats' - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } + id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True) + total = Column(Integer) + available = Column(Integer) + used = Column(Integer) + free = Column(Integer) + active = Column(Integer) + inactive = Column(Integer) + buffers = Column(Integer) + cached = Column(Integer) + shared = Column(Integer) + percent = Column(Float) -if not is_defined('memory_stats'): + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } - class MemoryStats(Entity): - """ - ``MemoryStats`` ORM model. - """ - __tablename__ = 'memory_stats' +class SwapStats(Entity): + """ + ``SwapStats`` ORM model. + """ - id = Column( - Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True - ) + __tablename__ = 'swap_stats' - total = Column(Integer) - available = Column(Integer) - used = Column(Integer) - free = Column(Integer) - active = Column(Integer) - inactive = Column(Integer) - buffers = Column(Integer) - cached = Column(Integer) - shared = Column(Integer) - percent = Column(Float) + id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True) - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } + total = Column(Integer) + used = Column(Integer) + free = Column(Integer) + percent = Column(Float) + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } -if not is_defined('swap_stats'): - class SwapStats(Entity): - """ - ``SwapStats`` ORM model. - """ +class Disk(Entity): + """ + ``Disk`` ORM model. + """ - __tablename__ = 'swap_stats' + __tablename__ = 'disk' - id = Column( - Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True - ) - - total = Column(Integer) - used = Column(Integer) - free = Column(Integer) - percent = Column(Float) + id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True) - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } + mountpoint = Column(String) + fstype = Column(String) + opts = Column(String) + total = Column(Integer) + used = Column(Integer) + free = Column(Integer) + percent = Column(Float) + read_count = Column(Integer) + write_count = Column(Integer) + read_bytes = Column(Integer) + write_bytes = Column(Integer) + read_time = Column(Float) + write_time = Column(Float) + busy_time = Column(Float) + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } -if not is_defined('disk'): - class Disk(Entity): - """ - ``Disk`` ORM model. - """ +class NetworkInterface(Device): + """ + ``NetworkInterface`` ORM model. + """ - __tablename__ = 'disk' + __tablename__ = 'network_interface' - id = Column( - Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True - ) + id = Column(Integer, ForeignKey(Device.id, ondelete='CASCADE'), primary_key=True) - mountpoint = Column(String) - fstype = Column(String) - opts = Column(String) - total = Column(Integer) - used = Column(Integer) - free = Column(Integer) - percent = Column(Float) - read_count = Column(Integer) - write_count = Column(Integer) - read_bytes = Column(Integer) - write_bytes = Column(Integer) - read_time = Column(Float) - write_time = Column(Float) - busy_time = Column(Float) - - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } - - -if not is_defined('network_interface'): - - class NetworkInterface(Device): - """ - ``NetworkInterface`` ORM model. - """ - - __tablename__ = 'network_interface' - - id = Column( - Integer, ForeignKey(Device.id, ondelete='CASCADE'), primary_key=True - ) - - bytes_sent = Column(Integer) - bytes_recv = Column(Integer) - packets_sent = Column(Integer) - packets_recv = Column(Integer) - errors_in = Column(Integer) - errors_out = Column(Integer) - drop_in = Column(Integer) - drop_out = Column(Integer) - addresses = Column(JSON) - speed = Column(Integer) - mtu = Column(Integer) - duplex = Column(String) - flags = Column(JSON) - - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } - - -if not is_defined('system_temperature'): - - class SystemTemperature(TemperatureSensor): - """ - Extends the ``TemperatureSensor``. - """ - - __tablename__ = 'system_temperature' - - id = Column( - Integer, - ForeignKey(TemperatureSensor.id, ondelete='CASCADE'), - primary_key=True, - ) - - high = Column(Float) - critical = Column(Float) - - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } - - -if not is_defined('system_fan'): - - class SystemFan(NumericSensor): - """ - ``SystemFan`` ORM model. - """ - - __tablename__ = 'system_fan' - - id = Column( - Integer, - ForeignKey(NumericSensor.id, ondelete='CASCADE'), - primary_key=True, - ) - - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } - - -if not is_defined('system_battery'): - - class SystemBattery(PercentSensor): - """ - ``SystemBattery`` ORM model. - """ - - __tablename__ = 'system_battery' - - id = Column( - Integer, - ForeignKey(PercentSensor.id, ondelete='CASCADE'), - primary_key=True, - ) - - seconds_left = Column(Float) - power_plugged = Column(Boolean) - - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } + bytes_sent = Column(Integer) + bytes_recv = Column(Integer) + packets_sent = Column(Integer) + packets_recv = Column(Integer) + errors_in = Column(Integer) + errors_out = Column(Integer) + drop_in = Column(Integer) + drop_out = Column(Integer) + addresses = Column(JSON) + speed = Column(Integer) + mtu = Column(Integer) + duplex = Column(String) + flags = Column(JSON) + + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } + + +class SystemTemperature(TemperatureSensor): + """ + Extends the ``TemperatureSensor``. + """ + + __tablename__ = 'system_temperature' + + id = Column( + Integer, + ForeignKey(TemperatureSensor.id, ondelete='CASCADE'), + primary_key=True, + ) + + high = Column(Float) + critical = Column(Float) + + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } + + +class SystemFan(NumericSensor): + """ + ``SystemFan`` ORM model. + """ + + __tablename__ = 'system_fan' + + id = Column( + Integer, + ForeignKey(NumericSensor.id, ondelete='CASCADE'), + primary_key=True, + ) + + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } + + +class SystemBattery(PercentSensor): + """ + ``SystemBattery`` ORM model. + """ + + __tablename__ = 'system_battery' + + id = Column( + Integer, + ForeignKey(PercentSensor.id, ondelete='CASCADE'), + primary_key=True, + ) + + seconds_left = Column(Float) + power_plugged = Column(Boolean) + + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } diff --git a/platypush/plugins/system/__init__.py b/platypush/plugins/system/__init__.py index 924149b18d..5156681755 100644 --- a/platypush/plugins/system/__init__.py +++ b/platypush/plugins/system/__init__.py @@ -634,13 +634,15 @@ class SystemPlugin(SensorPlugin, EntityManager): if fan.get('id') and fan.get('label') ], *[ - SystemBattery( - id='system:battery', - name='Battery', - **battery, + ( + SystemBattery( + id='system:battery', + name='Battery', + **battery, + ) + if battery + else () ) - if battery - else () ], ] From 13698481147434b854b388c42ec6363eb66ee6f9 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 1 Sep 2024 01:47:39 +0200 Subject: [PATCH 13/36] [#341] More `procedures` features. - `procedures.exec` now supports running procedures "on the fly" given a definition with a list of actions. - Fixed procedure renaming/overwrite logic. - Access to `_all_procedures` should always be guarded by a lock. --- platypush/plugins/procedures/__init__.py | 224 +++++++++++++---------- 1 file changed, 124 insertions(+), 100 deletions(-) diff --git a/platypush/plugins/procedures/__init__.py b/platypush/plugins/procedures/__init__.py index 1b82befcdb..e536201737 100644 --- a/platypush/plugins/procedures/__init__.py +++ b/platypush/plugins/procedures/__init__.py @@ -1,6 +1,8 @@ -from contextlib import contextmanager import json +from contextlib import contextmanager from dataclasses import dataclass +from multiprocessing import RLock +from random import randint from typing import Callable, Collection, Generator, Iterable, Optional, Union from sqlalchemy.orm import Session @@ -8,6 +10,7 @@ from sqlalchemy.orm import Session from platypush.context import get_plugin from platypush.entities.managers.procedures import ProcedureEntityManager from platypush.entities.procedures import Procedure, ProcedureType +from platypush.message.event.entities import EntityDeleteEvent from platypush.plugins import RunnablePlugin, action from platypush.plugins.db import DbPlugin from platypush.utils import run @@ -26,11 +29,60 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): Utility plugin to run and store procedures as native entities. """ - @action - def exec(self, procedure: str, *args, **kwargs): - return run(f'procedure.{procedure}', *args, **kwargs) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._status_lock = RLock() + + @action + def exec(self, procedure: Union[str, dict], *args, **kwargs): + """ + Execute a procedure. + + :param procedure: Procedure name or definition. If a string is passed, + then the procedure will be looked up by name in the configured + procedures. If a dictionary is passed, then it should be a valid + procedure definition with at least the ``actions`` key. + :param args: Optional arguments to be passed to the procedure. + :param kwargs: Optional arguments to be passed to the procedure. + """ + if isinstance(procedure, str): + return run(f'procedure.{procedure}', *args, **kwargs) + + assert isinstance(procedure, dict), 'Invalid procedure definition' + procedure_name = procedure.get( + 'name', f'procedure_{f"{randint(0, 1 << 32):08x}"}' + ) + + actions = procedure.get('actions') + assert actions and isinstance( + actions, (list, tuple, set) + ), 'Procedure definition should have at least the "actions" key as a list of actions' + + try: + # Create a temporary procedure definition and execute it + self._all_procedures[procedure_name] = { + 'name': procedure_name, + 'type': ProcedureType.CONFIG.value, + 'actions': list(actions), + 'args': procedure.get('args', []), + '_async': False, + } + + kwargs = { + **procedure.get('args', {}), + **kwargs, + } + + return self.exec(procedure_name, *args, **kwargs) + finally: + self._all_procedures.pop(procedure_name, None) + + def _convert_procedure( + self, name: str, proc: Union[dict, Callable, Procedure] + ) -> Procedure: + if isinstance(proc, Procedure): + return proc - def _convert_procedure(self, name: str, proc: Union[dict, Callable]) -> Procedure: metadata = self._serialize_procedure(proc, name=name) return Procedure( id=name, @@ -45,8 +97,15 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): ) @action - def status(self, *_, **__): + def status(self, *_, publish: bool = True, **__): """ + :param publish: If set to True (default) then the + :class:`platypush.message.event.entities.EntityUpdateEvent` events + will be published to the bus with the current configured procedures. + Usually this should be set to True, unless you're calling this method + from a context where you first want to retrieve the procedures and + then immediately modify them. In such cases, the published events may + result in race conditions on the entities engine. :return: The serialized configured procedures. Format: .. code-block:: json @@ -62,20 +121,11 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): } """ - self.publish_entities(self._get_wrapped_procedures()) - return self._get_serialized_procedures() + with self._status_lock: + if publish: + self.publish_entities(self._get_wrapped_procedures()) - def _update_procedure(self, old: Procedure, new: Procedure, session: Session): - assert old.procedure_type == ProcedureType.DB.value, ( # type: ignore[attr-defined] - f'Procedure {old.name} is not stored in the database, ' - f'it should be removed from the source file: {old.source}' - ) - - old.external_id = new.external_id - old.name = new.name - old.args = new.args - old.actions = new.actions - session.add(old) + return self._get_serialized_procedures() @action def save( @@ -115,61 +165,30 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): ), 'Procedure actions should be dictionaries with an "action" key' args = args or [] - proc_def = { - 'type': ProcedureType.DB.value, + proc_args = { 'name': name, + 'type': ProcedureType.DB.value, 'actions': actions, 'args': args, } - existing_proc = None - old_proc = None - new_proc = Procedure( - external_id=name, - plugin=str(self), - procedure_type=ProcedureType.DB.value, - name=name, - actions=actions, - args=args, - ) + with self._status_lock: + with self._db_session() as session: + if old_name and old_name != name: + try: + self._delete(old_name, session=session) + except AssertionError as e: + self.logger.warning( + 'Error while deleting old procedure: name=%s: %s', + old_name, + e, + ) - with self._db_session() as session: - if old_name and old_name != name: - old_proc = ( - session.query(Procedure).filter(Procedure.name == old_name).first() - ) + self._all_procedures[name] = proc_args - if old_proc: - self._update_procedure(old=old_proc, new=new_proc, session=session) - else: - self.logger.warning( - 'Procedure %s not found, skipping rename', old_name - ) + self.publish_entities([_ProcedureWrapper(name=name, obj=proc_args)]) - existing_proc = ( - session.query(Procedure).filter(Procedure.name == name).first() - ) - - if existing_proc: - if old_proc: - self._delete(str(existing_proc.name), session=session) - else: - self._update_procedure( - old=existing_proc, new=new_proc, session=session - ) - elif not old_proc: - session.add(new_proc) - - if old_proc: - old_name = str(old_proc.name) - self._all_procedures.pop(old_name, None) - - self._all_procedures[name] = { - **self._all_procedures.get(name, {}), # type: ignore[operator] - **proc_def, - } - - self.status() + return self.status() @action def delete(self, name: str): @@ -191,9 +210,7 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): def _db_session(self) -> Generator[Session, None, None]: db: Optional[DbPlugin] = get_plugin(DbPlugin) assert db, 'No database plugin configured' - with db.get_session( - autoflush=False, autocommit=False, expire_on_commit=False - ) as session: + with db.get_session(locked=True) as session: assert isinstance(session, Session) yield session @@ -216,12 +233,17 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): session.delete(proc_row) self._all_procedures.pop(name, None) + self._bus.post(EntityDeleteEvent(plugin=self, entity=proc_row)) def transform_entities( self, entities: Collection[_ProcedureWrapper], **_ ) -> Collection[Procedure]: return [ - self._convert_procedure(name=proc.name, proc=proc.obj) for proc in entities + self._convert_procedure( + name=proc.name, + proc=proc if isinstance(proc, Procedure) else proc.obj, + ) + for proc in entities ] def _get_wrapped_procedures(self) -> Collection[_ProcedureWrapper]: @@ -231,38 +253,40 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): ] def _sync_db_procedures(self): - cur_proc_names = set(self._all_procedures.keys()) - with self._db_session() as session: - saved_procs = { - str(proc.name): proc for proc in session.query(Procedure).all() - } - - procs_to_remove = [ - proc - for name, proc in saved_procs.items() - if name not in cur_proc_names - and proc.procedure_type != ProcedureType.DB.value # type: ignore[attr-defined] - ] - - for proc in procs_to_remove: - self.logger.info('Removing stale procedure record for %s', proc.name) - session.delete(proc) - - procs_to_add = [ - proc - for name, proc in saved_procs.items() - if proc.procedure_type == ProcedureType.DB.value # type: ignore[attr-defined] - and name not in cur_proc_names - ] - - for proc in procs_to_add: - self._all_procedures[str(proc.name)] = { - 'type': proc.procedure_type, - 'name': proc.name, - 'args': proc.args, - 'actions': proc.actions, + with self._status_lock: + cur_proc_names = set(self._all_procedures.keys()) + with self._db_session() as session: + saved_procs = { + str(proc.name): proc for proc in session.query(Procedure).all() } + procs_to_remove = [ + proc + for name, proc in saved_procs.items() + if name not in cur_proc_names + and proc.procedure_type != ProcedureType.DB.value # type: ignore[attr-defined] + ] + + for proc in procs_to_remove: + self.logger.info( + 'Removing stale procedure record for %s', proc.name + ) + session.delete(proc) + + procs_to_add = [ + proc + for proc in saved_procs.values() + if proc.procedure_type == ProcedureType.DB.value # type: ignore[attr-defined] + ] + + for proc in procs_to_add: + self._all_procedures[str(proc.name)] = { + 'type': proc.procedure_type, + 'name': proc.name, + 'args': proc.args, + 'actions': proc.actions, + } + @staticmethod def _serialize_procedure( proc: Union[dict, Callable], name: Optional[str] = None From 861e7e7c52bdf7a679e79edf14c6df5501248454 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 1 Sep 2024 18:13:06 +0200 Subject: [PATCH 14/36] [#341] More improvements on the `procedures` plugin. - `procedures.status` should always sync with the db to ensure that the action returns the most up-to-date version of the procedures. - Store, return and propagate entity procedure metadata. --- platypush/plugins/procedures/__init__.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/platypush/plugins/procedures/__init__.py b/platypush/plugins/procedures/__init__.py index e536201737..dde599c018 100644 --- a/platypush/plugins/procedures/__init__.py +++ b/platypush/plugins/procedures/__init__.py @@ -94,6 +94,7 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): line=metadata.get('line'), args=metadata.get('args', []), actions=metadata.get('actions', []), + meta=metadata.get('meta', {}), ) @action @@ -122,6 +123,7 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): """ with self._status_lock: + self._sync_db_procedures() if publish: self.publish_entities(self._get_wrapped_procedures()) @@ -134,6 +136,7 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): actions: Iterable[dict], args: Optional[Iterable[str]] = None, old_name: Optional[str] = None, + meta: Optional[dict] = None, **_, ): """ @@ -157,6 +160,17 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): as a list of strings with the argument names. :param old_name: Optional old name of the procedure if it's being renamed. + :param meta: Optional metadata to be stored with the procedure. Example: + + .. code-block:: json + + { + "icon": { + "class": "fas fa-cogs", + "color": "#00ff00" + } + } + """ assert name, 'Procedure name cannot be empty' assert actions, 'Procedure actions cannot be empty' @@ -165,11 +179,15 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): ), 'Procedure actions should be dictionaries with an "action" key' args = args or [] + proc_def = self._all_procedures.get(name, {}) proc_args = { 'name': name, 'type': ProcedureType.DB.value, 'actions': actions, 'args': args, + 'meta': ( + meta or (proc_def.get('meta', {}) if isinstance(proc_def, dict) else {}) + ), } with self._status_lock: @@ -285,6 +303,7 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): 'name': proc.name, 'args': proc.args, 'actions': proc.actions, + 'meta': proc.meta, } @staticmethod From e01782c3443d1b0d77f1272d29df72345cee5dda Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 25 Aug 2024 03:20:38 +0200 Subject: [PATCH 15/36] An empty commit to re-trigger the CI/CD pipelines From b53c4c5c18242f322109444c2da40dea14c61852 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 25 Aug 2024 03:20:38 +0200 Subject: [PATCH 16/36] An empty commit to re-trigger the CI/CD pipelines From c68cf4b58551152cbc344815ddec20eb78ff728f Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 25 Aug 2024 03:20:38 +0200 Subject: [PATCH 17/36] An empty commit to re-trigger the CI/CD pipelines From 5061c5290cbbc7f67439c90c3ffd61136eac967c Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 25 Aug 2024 03:20:38 +0200 Subject: [PATCH 18/36] An empty commit to re-trigger the CI/CD pipelines From 9aff704444c15199d16a74074ac398acd2c763be Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 25 Aug 2024 03:20:38 +0200 Subject: [PATCH 19/36] An empty commit to re-trigger the CI/CD pipelines From bca340ebc18101a05653801dad292e377262cdfc Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 25 Aug 2024 03:20:38 +0200 Subject: [PATCH 20/36] An empty commit to re-trigger the CI/CD pipelines From 981ae3479e83af9980905e45643747c900624428 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 25 Aug 2024 03:20:38 +0200 Subject: [PATCH 21/36] An empty commit to re-trigger the CI/CD pipelines From 90a953b7380c87ca160e905a0b9889b8a0e15177 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 13 Aug 2024 22:27:10 +0200 Subject: [PATCH 22/36] [WIP] --- platypush/entities/_engine/_procedure.py | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 platypush/entities/_engine/_procedure.py diff --git a/platypush/entities/_engine/_procedure.py b/platypush/entities/_engine/_procedure.py new file mode 100644 index 0000000000..74b566f236 --- /dev/null +++ b/platypush/entities/_engine/_procedure.py @@ -0,0 +1,7 @@ +class ProceduresManager: + """ + This class is responsible for managing the procedures as native entities. + """ + + def __init__(self): + self.procedures = {} From 26f491025a03d091467efc081f7d0c2f16c36aae Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 2 Sep 2024 02:20:39 +0200 Subject: [PATCH 23/36] [#341] Improvements on `procedures.save`. - Update the cached representation of the procedure asynchronously on the `publish_entities` callback. This prevents stale records from being loaded from the db before the entities engine has persisted the new ones. - Don't re-publish all entities when calling `procedures.status` at the end of `procedures.save`. This is both for performance reasons and to avoid sending to the entities engine stale representation of the data. --- platypush/plugins/procedures/__init__.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/platypush/plugins/procedures/__init__.py b/platypush/plugins/procedures/__init__.py index dde599c018..7b6aa4f36c 100644 --- a/platypush/plugins/procedures/__init__.py +++ b/platypush/plugins/procedures/__init__.py @@ -190,6 +190,9 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): ), } + def _on_entity_saved(*_, **__): + self._all_procedures[name] = proc_args + with self._status_lock: with self._db_session() as session: if old_name and old_name != name: @@ -202,11 +205,12 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): e, ) - self._all_procedures[name] = proc_args + self.publish_entities( + [_ProcedureWrapper(name=name, obj=proc_args)], + callback=_on_entity_saved, + ) - self.publish_entities([_ProcedureWrapper(name=name, obj=proc_args)]) - - return self.status() + return self.status(publish=False) @action def delete(self, name: str): From c5c872eb68c9eb85444fae3ba49b2a2728e03ea0 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 2 Sep 2024 02:27:33 +0200 Subject: [PATCH 24/36] [chore] Removed unused file re-added upon rebase. --- platypush/entities/_engine/_procedure.py | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 platypush/entities/_engine/_procedure.py diff --git a/platypush/entities/_engine/_procedure.py b/platypush/entities/_engine/_procedure.py deleted file mode 100644 index 74b566f236..0000000000 --- a/platypush/entities/_engine/_procedure.py +++ /dev/null @@ -1,7 +0,0 @@ -class ProceduresManager: - """ - This class is responsible for managing the procedures as native entities. - """ - - def __init__(self): - self.procedures = {} From e39e36e5f6bc6852fd102ab5f9d9cec79ae752e4 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 2 Sep 2024 02:31:11 +0200 Subject: [PATCH 25/36] [CI/CD] A more resilient `github-mirror` script. - Fail immediately if no branches are checked out. - Rebase only if we're pushing on master (don't bother for feature branches). - Do a push force to Github. --- .drone/github-mirror.sh | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/.drone/github-mirror.sh b/.drone/github-mirror.sh index 20832d4b51..4727946c85 100755 --- a/.drone/github-mirror.sh +++ b/.drone/github-mirror.sh @@ -6,9 +6,18 @@ ssh-keyscan github.com >> ~/.ssh/known_hosts 2>/dev/null # Clone the repository +branch=$(git rev-parse --abbrev-ref HEAD) +if [ -z "${branch}" ]; then + echo "No branch checked out" + exit 1 +fi + git remote add github git@github.com:/blacklight/platypush.git -git pull --rebase github "$(git branch | head -1 | awk '{print $2}')" || echo "No such branch on Github" + +if (( "$branch" == "master" )); then + git pull --rebase github "${branch}" || echo "No such branch on Github" +fi # Push the changes to the GitHub mirror -git push --all -v github +git push -f --all -v github git push --tags -v github From c54269e3d253f9f64bf20aa4528cac885dfef4c4 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 5 Sep 2024 01:13:30 +0200 Subject: [PATCH 26/36] [#341] Added utility `procedures.to_yaml` action. --- platypush/plugins/procedures/__init__.py | 53 ++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/platypush/plugins/procedures/__init__.py b/platypush/plugins/procedures/__init__.py index 7b6aa4f36c..7bea727bc2 100644 --- a/platypush/plugins/procedures/__init__.py +++ b/platypush/plugins/procedures/__init__.py @@ -1,10 +1,12 @@ import json +import re from contextlib import contextmanager from dataclasses import dataclass from multiprocessing import RLock from random import randint from typing import Callable, Collection, Generator, Iterable, Optional, Union +import yaml from sqlalchemy.orm import Session from platypush.context import get_plugin @@ -228,6 +230,57 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): self.status() + @action + def to_yaml(self, procedure: Union[str, dict]) -> str: + """ + Serialize a procedure to YAML. + + This method is useful to export a procedure to a file. + Note that it only works with either YAML-based procedures or + database-stored procedures: Python procedures can't be converted to + YAML. + + :param procedure: Procedure name or definition. If a string is passed, + then the procedure will be looked up by name in the configured + procedures. If a dictionary is passed, then it should be a valid + procedure definition with at least the ``actions`` and ``name`` + keys. + :return: The serialized procedure in YAML format. + """ + if isinstance(procedure, str): + proc = self._all_procedures.get(procedure) + assert proc, f'Procedure {proc} not found' + elif isinstance(procedure, dict): + name = self._normalize_name(procedure.get('name')) + assert name, 'Procedure name cannot be empty' + + actions = procedure.get('actions', []) + assert actions and isinstance( + actions, (list, tuple, set) + ), 'Procedure definition should have at least the "actions" key as a list of actions' + + args = [self._normalize_name(arg) for arg in procedure.get('args', [])] + proc = { + f'procedure.{name}' + + (f'({", ".join(args)})' if args else ''): [ + { + 'action': action['action'], + **({'args': action['args']} if action.get('args') else {}), + } + for action in actions + ] + } + else: + raise AssertionError( + f'Invalid procedure definition with type {type(procedure)}' + ) + + return yaml.safe_dump(proc, default_flow_style=False, indent=2) + + @staticmethod + def _normalize_name(name: Optional[str]) -> str: + return re.sub(r'[^\w.]+', '_', (name or '').strip(' .')) + @contextmanager def _db_session(self) -> Generator[Session, None, None]: db: Optional[DbPlugin] = get_plugin(DbPlugin) From f18d0d8b74ec958c29452fe45581019880263b88 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 6 Sep 2024 11:53:35 +0200 Subject: [PATCH 27/36] [procedures] Recursive serialization in `procedures.to_yaml`. --- platypush/plugins/procedures/__init__.py | 37 ++++++++++++++++++++---- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/platypush/plugins/procedures/__init__.py b/platypush/plugins/procedures/__init__.py index 7bea727bc2..db3d142710 100644 --- a/platypush/plugins/procedures/__init__.py +++ b/platypush/plugins/procedures/__init__.py @@ -4,7 +4,16 @@ from contextlib import contextmanager from dataclasses import dataclass from multiprocessing import RLock from random import randint -from typing import Callable, Collection, Generator, Iterable, Optional, Union +from typing import ( + Callable, + Collection, + Dict, + Generator, + Iterable, + List, + Optional, + Union, +) import yaml from sqlalchemy.orm import Session @@ -263,11 +272,7 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): proc = { f'procedure.{name}' + (f'({", ".join(args)})' if args else ''): [ - { - 'action': action['action'], - **({'args': action['args']} if action.get('args') else {}), - } - for action in actions + self._serialize_action(action) for action in actions ] } else: @@ -281,6 +286,26 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): def _normalize_name(name: Optional[str]) -> str: return re.sub(r'[^\w.]+', '_', (name or '').strip(' .')) + @classmethod + def _serialize_action(cls, data: Union[Iterable, Dict]) -> Union[Dict, List]: + if isinstance(data, dict): + if data.get('action'): + return { + 'action': data['action'], + **({'args': data['args']} if data.get('args') else {}), + } + + return { + k: ( + cls._serialize_action(v) + if isinstance(v, (dict, list, tuple)) + else v + ) + for k, v in data.items() + } + else: + return [cls._serialize_action(item) for item in data if item is not None] + @contextmanager def _db_session(self) -> Generator[Session, None, None]: db: Optional[DbPlugin] = get_plugin(DbPlugin) From 05b1fcd43a1c0a2429683d3bf77d8f7faf1d4119 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 6 Sep 2024 11:55:03 +0200 Subject: [PATCH 28/36] [procedures] Don't validate the presence of the `actions` field in `procedures.save`. When saving procedures with if/else/for blocks, some blocks aren't supposed to have the `actions` field. --- platypush/plugins/procedures/__init__.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/platypush/plugins/procedures/__init__.py b/platypush/plugins/procedures/__init__.py index db3d142710..3e66f7631e 100644 --- a/platypush/plugins/procedures/__init__.py +++ b/platypush/plugins/procedures/__init__.py @@ -185,9 +185,6 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): """ assert name, 'Procedure name cannot be empty' assert actions, 'Procedure actions cannot be empty' - assert all( - isinstance(action, dict) and action.get('action') for action in actions - ), 'Procedure actions should be dictionaries with an "action" key' args = args or [] proc_def = self._all_procedures.get(name, {}) From 5a7068501a63ceb3e89b28b5313fbc55ff5fa9e7 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 10 Sep 2024 19:49:16 +0200 Subject: [PATCH 29/36] [request] The action name can be specified either on `action` or `name`. This is for UI compatibility purposes. --- platypush/message/request/__init__.py | 3 ++- platypush/plugins/procedures/__init__.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 7a55e65429..8dda18eb7d 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -64,12 +64,13 @@ class Request(Message): msg = super().parse(msg) args = { 'target': msg.get('target', Config.get('device_id')), - 'action': msg['action'], + 'action': msg.get('action', msg.get('name')), 'args': msg.get('args', {}), 'id': msg['id'] if 'id' in msg else cls._generate_id(), 'timestamp': msg['_timestamp'] if '_timestamp' in msg else time.time(), } + assert args.get('action'), 'No action specified in the request' if 'origin' in msg: args['origin'] = msg['origin'] if 'token' in msg: diff --git a/platypush/plugins/procedures/__init__.py b/platypush/plugins/procedures/__init__.py index 3e66f7631e..cffd2f65f1 100644 --- a/platypush/plugins/procedures/__init__.py +++ b/platypush/plugins/procedures/__init__.py @@ -286,9 +286,10 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): @classmethod def _serialize_action(cls, data: Union[Iterable, Dict]) -> Union[Dict, List]: if isinstance(data, dict): - if data.get('action'): + name = data.get('action', data.get('name')) + if name: return { - 'action': data['action'], + 'action': name, **({'args': data['args']} if data.get('args') else {}), } From 946c7b1783f9c594c685cb9680102c015ff54552 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 10 Sep 2024 19:53:14 +0200 Subject: [PATCH 30/36] [procedure] Ignore `id` field in `Procedure.build`. The reason is that an `id` specified on procedure level will be applied to all the child requests. This means that the first response from the first completed request will be sent to Redis and mistakenly interpreted by HTTP listeners as the return value of the whole procedure. `Procedure.build` should instead calculate its own ID for the procedure, and apply different IDs to the child requests. --- platypush/procedure/__init__.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/platypush/procedure/__init__.py b/platypush/procedure/__init__.py index 56cdc8f130..50ac5bb49c 100644 --- a/platypush/procedure/__init__.py +++ b/platypush/procedure/__init__.py @@ -55,7 +55,6 @@ class Procedure: requests, args=None, backend=None, - id=None, # pylint: disable=redefined-builtin procedure_class=None, **kwargs, ): @@ -66,6 +65,7 @@ class Procedure: if_config = LifoQueue() procedure_class = procedure_class or cls key = None + kwargs.pop('id', None) for request_config in requests: # Check if it's a break/continue/return statement @@ -91,7 +91,6 @@ class Procedure: 'condition': condition, 'else_branch': [], 'backend': backend, - 'id': id, } ) @@ -132,7 +131,6 @@ class Procedure: _async=_async, requests=request_config[key], backend=backend, - id=id, iterator_name=iterator_name, iterable=iterable, ) @@ -156,14 +154,12 @@ class Procedure: requests=request_config[key], condition=condition, backend=backend, - id=id, ) reqs.append(loop) continue request_config['origin'] = Config.get('device_id') - request_config['id'] = id if 'target' not in request_config: request_config['target'] = request_config['origin'] From 1e9f7fb2c68cfb78e38c992af0de6faf1eb15515 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 10 Sep 2024 19:55:26 +0200 Subject: [PATCH 31/36] [procedure] Added support for custom values on the return statement. This enables constructs like this in procedures: ```yaml - return - return 1 - return: ${output} ``` --- platypush/procedure/__init__.py | 61 +++++++++++++++++++++++++++++---- 1 file changed, 55 insertions(+), 6 deletions(-) diff --git a/platypush/procedure/__init__.py b/platypush/procedure/__init__.py index 50ac5bb49c..36c57c4dfa 100644 --- a/platypush/procedure/__init__.py +++ b/platypush/procedure/__init__.py @@ -1,10 +1,11 @@ import enum import logging import re +from dataclasses import dataclass from functools import wraps from queue import LifoQueue -from typing import Optional +from typing import Any, Optional from ..common import exec_wrapper from ..config import Config @@ -14,7 +15,7 @@ from ..message.response import Response logger = logging.getLogger('platypush') -class Statement(enum.Enum): +class StatementType(enum.Enum): """ Enumerates the possible statements in a procedure. """ @@ -24,6 +25,45 @@ class Statement(enum.Enum): RETURN = 'return' +@dataclass +class Statement: + """ + Models a statement in a procedure. + """ + + type: StatementType + argument: Optional[str] = None + + @classmethod + def build(cls, statement: str): + """ + Builds a statement from a string. + """ + + m = re.match(r'\s*return\s*(.*)\s*', statement, re.IGNORECASE) + if m: + return ReturnStatement(argument=m.group(1)) + + return cls(StatementType(statement.lower())) + + def run(self, *_, **__) -> Optional[Any]: + """ + Executes the statement. + """ + + +@dataclass +class ReturnStatement(Statement): + """ + Models a return statement in a procedure. + """ + + type: StatementType = StatementType.RETURN + + def run(self, *_, **context): + return Request.expand_value_from_context(self.argument, **context) + + class Procedure: """Procedure class. A procedure is a pre-configured list of requests""" @@ -70,7 +110,15 @@ class Procedure: for request_config in requests: # Check if it's a break/continue/return statement if isinstance(request_config, str): - reqs.append(Statement(request_config)) + reqs.append(Statement.build(request_config)) + continue + + # Check if it's a return statement with a value + if ( + len(request_config.keys()) == 1 + and list(request_config.keys())[0] == 'return' + ): + reqs.append(ReturnStatement(argument=request_config['return'])) continue # Check if this request is an if-else @@ -218,15 +266,16 @@ class Procedure: continue if isinstance(request, Statement): - if request == Statement.RETURN: + if isinstance(request, ReturnStatement): + response = Response(output=request.run(**context)) self._should_return = True for proc in __stack__: proc._should_return = True # pylint: disable=protected-access break - if request in [Statement.BREAK, Statement.CONTINUE]: + if request.type in [StatementType.BREAK, StatementType.CONTINUE]: loop = self._find_nearest_loop(__stack__) - if request == Statement.BREAK: + if request == StatementType.BREAK: loop._should_break = True # pylint: disable=protected-access else: loop._should_continue = True # pylint: disable=protected-access From 853fce2521e84e10f2dee10c386913c04b3c348d Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 12 Sep 2024 02:14:40 +0200 Subject: [PATCH 32/36] [procedures] Fixed `if` queue flushing logic. Any pending `if`s in the parsing queue of a procedure should also be cleared if the current statement in the procedure is a break/continue/return. In such case we should terminate the current branch, and that involves ensuring that any `if`s branches that are still being parsed are inserted before the branch-terminating statement. --- platypush/procedure/__init__.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/platypush/procedure/__init__.py b/platypush/procedure/__init__.py index 36c57c4dfa..e2d5641496 100644 --- a/platypush/procedure/__init__.py +++ b/platypush/procedure/__init__.py @@ -5,7 +5,7 @@ from dataclasses import dataclass from functools import wraps from queue import LifoQueue -from typing import Any, Optional +from typing import Any, List, Optional from ..common import exec_wrapper from ..config import Config @@ -110,6 +110,7 @@ class Procedure: for request_config in requests: # Check if it's a break/continue/return statement if isinstance(request_config, str): + cls._flush_if_statements(reqs, if_config) reqs.append(Statement.build(request_config)) continue @@ -118,6 +119,7 @@ class Procedure: len(request_config.keys()) == 1 and list(request_config.keys())[0] == 'return' ): + cls._flush_if_statements(reqs, if_config) reqs.append(ReturnStatement(argument=request_config['return'])) continue @@ -214,9 +216,7 @@ class Procedure: request = Request.build(request_config) reqs.append(request) - while not if_config.empty(): - pending_if = if_config.get() - reqs.append(IfProcedure.build(**pending_if)) + cls._flush_if_statements(reqs, if_config) return procedure_class( name=name, @@ -227,6 +227,12 @@ class Procedure: **kwargs, ) + @staticmethod + def _flush_if_statements(requests: List, if_config: LifoQueue): + while not if_config.empty(): + pending_if = if_config.get() + requests.append(IfProcedure.build(**pending_if)) + @staticmethod def _find_nearest_loop(stack): for proc in stack[::-1]: From 771e32e368d7ead1d8e387cee67e5d5363dbc283 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 13 Sep 2024 18:21:27 +0200 Subject: [PATCH 33/36] [#341] `procedure._serialize_action` should also support strings. --- platypush/plugins/procedures/__init__.py | 4 +++- platypush/procedure/__init__.py | 8 ++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/platypush/plugins/procedures/__init__.py b/platypush/plugins/procedures/__init__.py index cffd2f65f1..0f104ed027 100644 --- a/platypush/plugins/procedures/__init__.py +++ b/platypush/plugins/procedures/__init__.py @@ -284,7 +284,7 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): return re.sub(r'[^\w.]+', '_', (name or '').strip(' .')) @classmethod - def _serialize_action(cls, data: Union[Iterable, Dict]) -> Union[Dict, List]: + def _serialize_action(cls, data: Union[Iterable, Dict]) -> Union[Dict, List, str]: if isinstance(data, dict): name = data.get('action', data.get('name')) if name: @@ -301,6 +301,8 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): ) for k, v in data.items() } + elif isinstance(data, str): + return data else: return [cls._serialize_action(item) for item in data if item is not None] diff --git a/platypush/procedure/__init__.py b/platypush/procedure/__init__.py index e2d5641496..bb5c01675e 100644 --- a/platypush/procedure/__init__.py +++ b/platypush/procedure/__init__.py @@ -281,7 +281,7 @@ class Procedure: if request.type in [StatementType.BREAK, StatementType.CONTINUE]: loop = self._find_nearest_loop(__stack__) - if request == StatementType.BREAK: + if request.type == StatementType.BREAK: loop._should_break = True # pylint: disable=protected-access else: loop._should_continue = True # pylint: disable=protected-access @@ -291,9 +291,9 @@ class Procedure: should_break = getattr(self, '_should_break', False) if isinstance(self, LoopProcedure) and (should_continue or should_break): if should_continue: - self._should_continue = ( # pylint: disable=attribute-defined-outside-init - False - ) + setattr(self, '_should_continue', False) # noqa[B010] + else: + setattr(self, '_should_break', False) # noqa[B010] break From be8140ddb5f40b644eb9d8f61d7ae5d7b20f0ccf Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 16 Sep 2024 03:16:53 +0200 Subject: [PATCH 34/36] [procedure] Several improvements to the procedure engine. - Add `set` statement, which can be used to set context variables within YAML procedures. Example: ```yaml procedure.test: - set: foo: bar - action: logger.info args: msg: ${bar} ``` - More reliable flow control for nested break/continue/return. - Propagate changes to context variables also to upstream procedures. --- platypush/procedure/__init__.py | 205 ++++++++++++++++++-------------- 1 file changed, 119 insertions(+), 86 deletions(-) diff --git a/platypush/procedure/__init__.py b/platypush/procedure/__init__.py index bb5c01675e..0e28b5f1a9 100644 --- a/platypush/procedure/__init__.py +++ b/platypush/procedure/__init__.py @@ -1,11 +1,12 @@ import enum import logging import re -from dataclasses import dataclass +from copy import deepcopy +from dataclasses import dataclass, field from functools import wraps from queue import LifoQueue -from typing import Any, List, Optional +from typing import Any, Dict, Iterable, List, Optional from ..common import exec_wrapper from ..config import Config @@ -23,6 +24,7 @@ class StatementType(enum.Enum): BREAK = 'break' CONTINUE = 'continue' RETURN = 'return' + SET = 'set' @dataclass @@ -32,7 +34,7 @@ class Statement: """ type: StatementType - argument: Optional[str] = None + argument: Optional[Any] = None @classmethod def build(cls, statement: str): @@ -60,8 +62,30 @@ class ReturnStatement(Statement): type: StatementType = StatementType.RETURN + def run(self, *_, **context) -> Any: + return Response( + output=Request.expand_value_from_context( + self.argument, **_update_context(context) + ) + ) + + +@dataclass +class SetStatement(Statement): + """ + Models a set variable statement in a procedure. + """ + + type: StatementType = StatementType.SET + vars: dict = field(default_factory=dict) + def run(self, *_, **context): - return Request.expand_value_from_context(self.argument, **context) + vars = deepcopy(self.vars) # pylint: disable=redefined-builtin + for k, v in vars.items(): + vars[k] = Request.expand_value_from_context(v, **context) + + context.update(vars) + return Response(output=vars) class Procedure: @@ -117,10 +141,20 @@ class Procedure: # Check if it's a return statement with a value if ( len(request_config.keys()) == 1 - and list(request_config.keys())[0] == 'return' + and list(request_config.keys())[0] == StatementType.RETURN.value ): cls._flush_if_statements(reqs, if_config) - reqs.append(ReturnStatement(argument=request_config['return'])) + reqs.append( + ReturnStatement(argument=request_config[StatementType.RETURN.value]) + ) + continue + + # Check if it's a variable set statement + if (len(request_config.keys()) == 1) and ( + list(request_config.keys())[0] == StatementType.SET.value + ): + cls._flush_if_statements(reqs, if_config) + reqs.append(SetStatement(vars=request_config[StatementType.SET.value])) continue # Check if this request is an if-else @@ -129,6 +163,7 @@ class Procedure: m = re.match(r'\s*(if)\s+\${(.*)}\s*', key) if m: + cls._flush_if_statements(reqs, if_config) if_count += 1 if_name = f'{name}__if_{if_count}' condition = m.group(2) @@ -233,86 +268,96 @@ class Procedure: pending_if = if_config.get() requests.append(IfProcedure.build(**pending_if)) - @staticmethod - def _find_nearest_loop(stack): - for proc in stack[::-1]: - if isinstance(proc, LoopProcedure): - return proc - - raise AssertionError('break/continue statement found outside of a loop') - # pylint: disable=too-many-branches,too-many-statements - def execute(self, n_tries=1, __stack__=None, **context): + def execute( + self, + n_tries: int = 1, + __stack__: Optional[Iterable] = None, + new_context: Optional[Dict[str, Any]] = None, + **context, + ): """ Execute the requests in the procedure. :param n_tries: Number of tries in case of failure before raising a RuntimeError. """ - if not __stack__: - __stack__ = [self] - else: - __stack__.append(self) + __stack__ = (self,) if not __stack__ else (self, *__stack__) + new_context = new_context or {} if self.args: args = self.args.copy() for k, v in args.items(): - v = Request.expand_value_from_context(v, **context) - args[k] = v - context[k] = v + args[k] = context[k] = Request.expand_value_from_context(v, **context) logger.info('Executing procedure %s with arguments %s', self.name, args) else: logger.info('Executing procedure %s', self.name) response = Response() token = Config.get('token') + context = _update_context(context) + locals().update(context) + # pylint: disable=too-many-nested-blocks for request in self.requests: if callable(request): response = request(**context) continue + context['_async'] = self._async + context['n_tries'] = n_tries + context['__stack__'] = __stack__ + context['new_context'] = new_context + if isinstance(request, Statement): if isinstance(request, ReturnStatement): - response = Response(output=request.run(**context)) + response = request.run(**context) self._should_return = True for proc in __stack__: proc._should_return = True # pylint: disable=protected-access + break + if isinstance(request, SetStatement): + rs: dict = request.run(**context).output # type: ignore + context.update(rs) + new_context.update(rs) + locals().update(rs) + continue + if request.type in [StatementType.BREAK, StatementType.CONTINUE]: - loop = self._find_nearest_loop(__stack__) - if request.type == StatementType.BREAK: - loop._should_break = True # pylint: disable=protected-access - else: - loop._should_continue = True # pylint: disable=protected-access + for proc in __stack__: + if isinstance(proc, LoopProcedure): + if request.type == StatementType.BREAK: + setattr(proc, '_should_break', True) # noqa: B010 + else: + setattr(proc, '_should_continue', True) # noqa: B010 + break + + proc._should_return = True # pylint: disable=protected-access + break should_continue = getattr(self, '_should_continue', False) should_break = getattr(self, '_should_break', False) - if isinstance(self, LoopProcedure) and (should_continue or should_break): - if should_continue: - setattr(self, '_should_continue', False) # noqa[B010] - else: - setattr(self, '_should_break', False) # noqa[B010] - + if self._should_return or should_continue or should_break: break if token and not isinstance(request, Statement): request.token = token - context['_async'] = self._async - context['n_tries'] = n_tries exec_ = getattr(request, 'execute', None) if callable(exec_): - response = exec_(__stack__=__stack__, **context) + response = exec_(**context) + context.update(context.get('new_context', {})) if not self._async and response: if isinstance(response.output, dict): - for k, v in response.output.items(): - context[k] = v + context.update(response.output) context['output'] = response.output context['errors'] = response.errors + new_context.update(context) + locals().update(context) if self._should_return: break @@ -333,10 +378,8 @@ class LoopProcedure(Procedure): Base class while and for/fork loops. """ - def __init__(self, name, requests, _async=False, args=None, backend=None): - super().__init__( - name=name, _async=_async, requests=requests, args=args, backend=backend - ) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) self._should_break = False self._should_continue = False @@ -381,6 +424,9 @@ class ForProcedure(LoopProcedure): # pylint: disable=eval-used def execute(self, *_, **context): + ctx = _update_context(context) + locals().update(ctx) + try: iterable = eval(self.iterable) assert hasattr( @@ -388,11 +434,18 @@ class ForProcedure(LoopProcedure): ), f'Object of type {type(iterable)} is not iterable: {iterable}' except Exception as e: logger.debug('Iterable %s expansion error: %s', self.iterable, e) - iterable = Request.expand_value_from_context(self.iterable, **context) + iterable = Request.expand_value_from_context(self.iterable, **ctx) response = Response() for item in iterable: + ctx[self.iterator_name] = item + response = super().execute(**ctx) + ctx.update(ctx.get('new_context', {})) + + if response.output and isinstance(response.output, dict): + ctx = _update_context(ctx, **response.output) + if self._should_return: logger.info('Returning from %s', self.name) break @@ -407,9 +460,6 @@ class ForProcedure(LoopProcedure): logger.info('Breaking loop %s', self.name) break - context[self.iterator_name] = item - response = super().execute(**context) - return response @@ -446,41 +496,23 @@ class WhileProcedure(LoopProcedure): ) self.condition = condition - @staticmethod - def _get_context(**context): - for k, v in context.items(): - try: - context[k] = eval(v) # pylint: disable=eval-used - except Exception as e: - logger.debug('Evaluation error for %s=%s: %s', k, v, e) - if isinstance(v, str): - try: - context[k] = eval( # pylint: disable=eval-used - '"' + re.sub(r'(^|[^\\])"', '\1\\"', v) + '"' - ) - except Exception as ee: - logger.warning( - 'Could not parse value for context variable %s=%s: %s', - k, - v, - ee, - ) - logger.warning('Context: %s', context) - logger.exception(e) - - return context - def execute(self, *_, **context): response = Response() - context = self._get_context(**context) - for k, v in context.items(): - locals()[k] = v + ctx = _update_context(context) + locals().update(ctx) while True: condition_true = eval(self.condition) # pylint: disable=eval-used if not condition_true: break + response = super().execute(**ctx) + ctx.update(ctx.get('new_context', {})) + if response.output and isinstance(response.output, dict): + _update_context(ctx, **response.output) + + locals().update(ctx) + if self._should_return: logger.info('Returning from %s', self.name) break @@ -495,13 +527,6 @@ class WhileProcedure(LoopProcedure): logger.info('Breaking loop %s', self.name) break - response = super().execute(**context) - - if response.output and isinstance(response.output, dict): - new_context = self._get_context(**response.output) - for k, v in new_context.items(): - locals()[k] = v - return response @@ -595,20 +620,28 @@ class IfProcedure(Procedure): ) def execute(self, *_, **context): - for k, v in context.items(): - locals()[k] = v - + ctx = _update_context(context) + locals().update(ctx) condition_true = eval(self.condition) # pylint: disable=eval-used response = Response() if condition_true: - response = super().execute(**context) + response = super().execute(**ctx) elif self.else_branch: - response = self.else_branch.execute(**context) + response = self.else_branch.execute(**ctx) return response +def _update_context(context: Optional[Dict[str, Any]] = None, **kwargs): + ctx = context or {} + ctx = {**ctx.get('context', {}), **ctx, **kwargs} + for k, v in ctx.items(): + ctx[k] = Request.expand_value_from_context(v, **ctx) + + return ctx + + def procedure(name_or_func: Optional[str] = None, *upper_args, **upper_kwargs): name = name_or_func if isinstance(name_or_func, str) else None From 38c1ebb90c7ed6450d5392bd41397b1c9257d4b9 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 23 Sep 2024 03:20:55 +0200 Subject: [PATCH 35/36] [alarm] Don't fail if no audio file is provided. --- platypush/plugins/alarm/__init__.py | 23 ++++++++++++++--------- platypush/plugins/alarm/_model.py | 3 +++ 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/platypush/plugins/alarm/__init__.py b/platypush/plugins/alarm/__init__.py index 7a5c48d3a3..f98ebe53b5 100644 --- a/platypush/plugins/alarm/__init__.py +++ b/platypush/plugins/alarm/__init__.py @@ -179,13 +179,13 @@ class AlarmPlugin(RunnablePlugin, EntityManager): else: # If the alarm record on the db is static, but the alarm is no # longer present in the configuration, then we want to delete it - if alarm.static: + if bool(alarm.static): self._clear_alarm(alarm, session) else: self.alarms[name] = Alarm.from_db( alarm, stop_event=self._should_stop, - media_plugin=alarm.media_plugin or self.media_plugin, + media_plugin=str(alarm.media_plugin) or self.media_plugin, on_change=self._on_alarm_update, ) @@ -215,7 +215,12 @@ class AlarmPlugin(RunnablePlugin, EntityManager): def _clear_alarm(self, alarm: DbAlarm, session: Session): alarm_obj = self.alarms.pop(str(alarm.name), None) if alarm_obj: - alarm_obj.stop() + try: + alarm_obj.stop() + except Exception as e: + self.logger.warning( + f'Error while stopping alarm {alarm.name}: {e}', exc_info=True + ) session.delete(alarm) self._bus.post(EntityDeleteEvent(entity=alarm)) @@ -439,15 +444,15 @@ class AlarmPlugin(RunnablePlugin, EntityManager): when=when or alarm.when, media=media or alarm.media, media_plugin=media_plugin or alarm.media_plugin or self.media_plugin, - media_repeat=media_repeat - if media_repeat is not None - else alarm.media_repeat, + media_repeat=( + media_repeat if media_repeat is not None else alarm.media_repeat + ), actions=actions if actions is not None else (alarm.actions or []), name=new_name or name, enabled=enabled if enabled is not None else alarm.is_enabled(), - audio_volume=audio_volume - if audio_volume is not None - else alarm.audio_volume, + audio_volume=( + audio_volume if audio_volume is not None else alarm.audio_volume + ), snooze_interval=snooze_interval or alarm.snooze_interval, dismiss_interval=dismiss_interval or alarm.dismiss_interval, ).to_dict() diff --git a/platypush/plugins/alarm/_model.py b/platypush/plugins/alarm/_model.py index 81799154b6..5441885d45 100644 --- a/platypush/plugins/alarm/_model.py +++ b/platypush/plugins/alarm/_model.py @@ -274,6 +274,9 @@ class Alarm: if self.audio_volume is not None: self._get_media_plugin().set_volume(self.audio_volume) + if not self.media: + return + audio_thread = threading.Thread(target=thread) audio_thread.start() From 8d8e1878bb41b2fbd98387332d527156999a2b96 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 23 Sep 2024 03:44:05 +0200 Subject: [PATCH 36/36] Updated CHANGELOG --- CHANGELOG.md | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c4c98d1a30..4d1337c016 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,44 @@ # Changelog +## [Unreleased] + +- [[#333](https://git.platypush.tech/platypush/platypush/issues/333)]: new file + browser UI/component. It includes custom MIME type support, a file editor + with syntax highlight, file download and file upload. + +- [[#341](https://git.platypush.tech/platypush/platypush/issues/341)]: + procedures are now native entities that can be managed from the entities panel. + A new versatile procedure editor has also been added, with support for nested + blocks, conditions, loops, variables, context autocomplete, and more. + +- [`procedure`]: Added the following features to YAML/structured procedures: + + - `set`: to set variables whose scope is limited to the procedure / code + block where they are created. `variable.set` is useful to permanently + store variables on the db, `variable.mset` is useful to set temporary + global variables in memory through Redis, but sometimes you may just want + to assign a value to a variable that only needs to live within a procedure, + event hook or cron. + + ```yaml + - set: + foo: bar + temperature: ${output.get('temperature')} + ``` + + - `return` can now return values too when invoked within a procedure: + + ```yaml + - return: something + # Or + - return: "Result: ${output.get('response')}" + ``` + +- The default logging format is now much more compact. The full body of events + and requests is no longer included by default in `info` mode - instead, a + summary with the message type, ID and response time is logged. The full + payloads can still be logged by enabling `debug` logs through e.g. `-v`. + ## [1.2.3] - [[#422](https://git.platypush.tech/platypush/platypush/issues/422)]: adapted