diff --git a/.drone/github-mirror.sh b/.drone/github-mirror.sh index 20832d4b51..4727946c85 100755 --- a/.drone/github-mirror.sh +++ b/.drone/github-mirror.sh @@ -6,9 +6,18 @@ ssh-keyscan github.com >> ~/.ssh/known_hosts 2>/dev/null # Clone the repository +branch=$(git rev-parse --abbrev-ref HEAD) +if [ -z "${branch}" ]; then + echo "No branch checked out" + exit 1 +fi + git remote add github git@github.com:/blacklight/platypush.git -git pull --rebase github "$(git branch | head -1 | awk '{print $2}')" || echo "No such branch on Github" + +if (( "$branch" == "master" )); then + git pull --rebase github "${branch}" || echo "No such branch on Github" +fi # Push the changes to the GitHub mirror -git push --all -v github +git push -f --all -v github git push --tags -v github diff --git a/CHANGELOG.md b/CHANGELOG.md index c4c98d1a30..4d1337c016 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,44 @@ # Changelog +## [Unreleased] + +- [[#333](https://git.platypush.tech/platypush/platypush/issues/333)]: new file + browser UI/component. It includes custom MIME type support, a file editor + with syntax highlight, file download and file upload. + +- [[#341](https://git.platypush.tech/platypush/platypush/issues/341)]: + procedures are now native entities that can be managed from the entities panel. + A new versatile procedure editor has also been added, with support for nested + blocks, conditions, loops, variables, context autocomplete, and more. + +- [`procedure`]: Added the following features to YAML/structured procedures: + + - `set`: to set variables whose scope is limited to the procedure / code + block where they are created. `variable.set` is useful to permanently + store variables on the db, `variable.mset` is useful to set temporary + global variables in memory through Redis, but sometimes you may just want + to assign a value to a variable that only needs to live within a procedure, + event hook or cron. + + ```yaml + - set: + foo: bar + temperature: ${output.get('temperature')} + ``` + + - `return` can now return values too when invoked within a procedure: + + ```yaml + - return: something + # Or + - return: "Result: ${output.get('response')}" + ``` + +- The default logging format is now much more compact. The full body of events + and requests is no longer included by default in `info` mode - instead, a + summary with the message type, ID and response time is logged. The full + payloads can still be logged by enabling `debug` logs through e.g. `-v`. + ## [1.2.3] - [[#422](https://git.platypush.tech/platypush/platypush/issues/422)]: adapted diff --git a/platypush/app/_app.py b/platypush/app/_app.py index 9bcffdba03..4f16e69c29 100644 --- a/platypush/app/_app.py +++ b/platypush/app/_app.py @@ -365,7 +365,13 @@ class Application: elif isinstance(msg, Response): msg.log() elif isinstance(msg, Event): - msg.log() + log.info( + 'Received event: %s.%s[id=%s]', + msg.__class__.__module__, + msg.__class__.__name__, + msg.id, + ) + msg.log(level=logging.DEBUG) self.event_processor.process_event(msg) return _f diff --git a/platypush/config/__init__.py b/platypush/config/__init__.py index 945a47af85..3eaa7ab5a6 100644 --- a/platypush/config/__init__.py +++ b/platypush/config/__init__.py @@ -179,6 +179,8 @@ class Config: self._config['logging'] = logging_config def _init_db(self, db: Optional[str] = None): + self._config['_db'] = self._config.get('db', {}) + # If the db connection string is passed as an argument, use it if db: self._config['db'] = { diff --git a/platypush/entities/procedures.py b/platypush/entities/procedures.py index eef3427bbc..14c6ad14d3 100644 --- a/platypush/entities/procedures.py +++ b/platypush/entities/procedures.py @@ -1,8 +1,9 @@ import logging +from enum import Enum from sqlalchemy import ( Column, - Enum, + Enum as DbEnum, ForeignKey, Integer, JSON, @@ -16,6 +17,12 @@ from . import Entity logger = logging.getLogger(__name__) +class ProcedureType(Enum): + PYTHON = 'python' + CONFIG = 'config' + DB = 'db' + + if not is_defined('procedure'): class Procedure(Entity): @@ -30,7 +37,13 @@ if not is_defined('procedure'): ) args = Column(JSON, nullable=False, default=[]) procedure_type = Column( - Enum('python', 'config', name='procedure_type'), nullable=False + DbEnum( + *[m.value for m in ProcedureType.__members__.values()], + name='procedure_type', + create_constraint=True, + validate_strings=True, + ), + nullable=False, ) module = Column(String) source = Column(String) diff --git a/platypush/entities/system.py b/platypush/entities/system.py index 0421399adb..0ed543185e 100644 --- a/platypush/entities/system.py +++ b/platypush/entities/system.py @@ -1,289 +1,249 @@ from sqlalchemy import Boolean, Column, Float, ForeignKey, Integer, JSON, String -from platypush.common.db import is_defined - from . import Entity from .devices import Device from .sensors import NumericSensor, PercentSensor from .temperature import TemperatureSensor -if not is_defined('cpu'): +class Cpu(Entity): + """ + ``CPU`` ORM (container) model. + """ - class Cpu(Entity): - """ - ``CPU`` ORM (container) model. - """ + __tablename__ = 'cpu' - __tablename__ = 'cpu' + id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True) - id = Column( - Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True - ) + percent = Column(Float) - percent = Column(Float) + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } +class CpuInfo(Entity): + """ + ``CpuInfo`` ORM model. + """ -if not is_defined('cpu_info'): + __tablename__ = 'cpu_info' - class CpuInfo(Entity): - """ - ``CpuInfo`` ORM model. - """ + id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True) - __tablename__ = 'cpu_info' + architecture = Column(String) + bits = Column(Integer) + cores = Column(Integer) + vendor = Column(String) + brand = Column(String) + frequency_advertised = Column(Integer) + frequency_actual = Column(Integer) + flags = Column(JSON) + l1_instruction_cache_size = Column(Integer) + l1_data_cache_size = Column(Integer) + l2_cache_size = Column(Integer) + l3_cache_size = Column(Integer) - id = Column( - Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True - ) + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } - architecture = Column(String) - bits = Column(Integer) - cores = Column(Integer) - vendor = Column(String) - brand = Column(String) - frequency_advertised = Column(Integer) - frequency_actual = Column(Integer) - flags = Column(JSON) - l1_instruction_cache_size = Column(Integer) - l1_data_cache_size = Column(Integer) - l2_cache_size = Column(Integer) - l3_cache_size = Column(Integer) - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } +class CpuTimes(Entity): + """ + ``CpuTimes`` ORM (container) model. + """ + __tablename__ = 'cpu_times' -if not is_defined('cpu_times'): + id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True) - class CpuTimes(Entity): - """ - ``CpuTimes`` ORM (container) model. - """ + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } - __tablename__ = 'cpu_times' - id = Column( - Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True - ) +class CpuStats(Entity): + """ + ``CpuStats`` ORM (container) model. + """ - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } + __tablename__ = 'cpu_stats' + id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True) -if not is_defined('cpu_stats'): + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } - class CpuStats(Entity): - """ - ``CpuStats`` ORM (container) model. - """ - __tablename__ = 'cpu_stats' +class MemoryStats(Entity): + """ + ``MemoryStats`` ORM model. + """ - id = Column( - Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True - ) + __tablename__ = 'memory_stats' - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } + id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True) + total = Column(Integer) + available = Column(Integer) + used = Column(Integer) + free = Column(Integer) + active = Column(Integer) + inactive = Column(Integer) + buffers = Column(Integer) + cached = Column(Integer) + shared = Column(Integer) + percent = Column(Float) -if not is_defined('memory_stats'): + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } - class MemoryStats(Entity): - """ - ``MemoryStats`` ORM model. - """ - __tablename__ = 'memory_stats' +class SwapStats(Entity): + """ + ``SwapStats`` ORM model. + """ - id = Column( - Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True - ) + __tablename__ = 'swap_stats' - total = Column(Integer) - available = Column(Integer) - used = Column(Integer) - free = Column(Integer) - active = Column(Integer) - inactive = Column(Integer) - buffers = Column(Integer) - cached = Column(Integer) - shared = Column(Integer) - percent = Column(Float) + id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True) - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } + total = Column(Integer) + used = Column(Integer) + free = Column(Integer) + percent = Column(Float) + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } -if not is_defined('swap_stats'): - class SwapStats(Entity): - """ - ``SwapStats`` ORM model. - """ +class Disk(Entity): + """ + ``Disk`` ORM model. + """ - __tablename__ = 'swap_stats' + __tablename__ = 'disk' - id = Column( - Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True - ) - - total = Column(Integer) - used = Column(Integer) - free = Column(Integer) - percent = Column(Float) + id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True) - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } + mountpoint = Column(String) + fstype = Column(String) + opts = Column(String) + total = Column(Integer) + used = Column(Integer) + free = Column(Integer) + percent = Column(Float) + read_count = Column(Integer) + write_count = Column(Integer) + read_bytes = Column(Integer) + write_bytes = Column(Integer) + read_time = Column(Float) + write_time = Column(Float) + busy_time = Column(Float) + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } -if not is_defined('disk'): - class Disk(Entity): - """ - ``Disk`` ORM model. - """ +class NetworkInterface(Device): + """ + ``NetworkInterface`` ORM model. + """ - __tablename__ = 'disk' + __tablename__ = 'network_interface' - id = Column( - Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True - ) + id = Column(Integer, ForeignKey(Device.id, ondelete='CASCADE'), primary_key=True) - mountpoint = Column(String) - fstype = Column(String) - opts = Column(String) - total = Column(Integer) - used = Column(Integer) - free = Column(Integer) - percent = Column(Float) - read_count = Column(Integer) - write_count = Column(Integer) - read_bytes = Column(Integer) - write_bytes = Column(Integer) - read_time = Column(Float) - write_time = Column(Float) - busy_time = Column(Float) - - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } - - -if not is_defined('network_interface'): - - class NetworkInterface(Device): - """ - ``NetworkInterface`` ORM model. - """ - - __tablename__ = 'network_interface' - - id = Column( - Integer, ForeignKey(Device.id, ondelete='CASCADE'), primary_key=True - ) - - bytes_sent = Column(Integer) - bytes_recv = Column(Integer) - packets_sent = Column(Integer) - packets_recv = Column(Integer) - errors_in = Column(Integer) - errors_out = Column(Integer) - drop_in = Column(Integer) - drop_out = Column(Integer) - addresses = Column(JSON) - speed = Column(Integer) - mtu = Column(Integer) - duplex = Column(String) - flags = Column(JSON) - - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } - - -if not is_defined('system_temperature'): - - class SystemTemperature(TemperatureSensor): - """ - Extends the ``TemperatureSensor``. - """ - - __tablename__ = 'system_temperature' - - id = Column( - Integer, - ForeignKey(TemperatureSensor.id, ondelete='CASCADE'), - primary_key=True, - ) - - high = Column(Float) - critical = Column(Float) - - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } - - -if not is_defined('system_fan'): - - class SystemFan(NumericSensor): - """ - ``SystemFan`` ORM model. - """ - - __tablename__ = 'system_fan' - - id = Column( - Integer, - ForeignKey(NumericSensor.id, ondelete='CASCADE'), - primary_key=True, - ) - - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } - - -if not is_defined('system_battery'): - - class SystemBattery(PercentSensor): - """ - ``SystemBattery`` ORM model. - """ - - __tablename__ = 'system_battery' - - id = Column( - Integer, - ForeignKey(PercentSensor.id, ondelete='CASCADE'), - primary_key=True, - ) - - seconds_left = Column(Float) - power_plugged = Column(Boolean) - - __table_args__ = {'extend_existing': True} - __mapper_args__ = { - 'polymorphic_identity': __tablename__, - } + bytes_sent = Column(Integer) + bytes_recv = Column(Integer) + packets_sent = Column(Integer) + packets_recv = Column(Integer) + errors_in = Column(Integer) + errors_out = Column(Integer) + drop_in = Column(Integer) + drop_out = Column(Integer) + addresses = Column(JSON) + speed = Column(Integer) + mtu = Column(Integer) + duplex = Column(String) + flags = Column(JSON) + + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } + + +class SystemTemperature(TemperatureSensor): + """ + Extends the ``TemperatureSensor``. + """ + + __tablename__ = 'system_temperature' + + id = Column( + Integer, + ForeignKey(TemperatureSensor.id, ondelete='CASCADE'), + primary_key=True, + ) + + high = Column(Float) + critical = Column(Float) + + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } + + +class SystemFan(NumericSensor): + """ + ``SystemFan`` ORM model. + """ + + __tablename__ = 'system_fan' + + id = Column( + Integer, + ForeignKey(NumericSensor.id, ondelete='CASCADE'), + primary_key=True, + ) + + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } + + +class SystemBattery(PercentSensor): + """ + ``SystemBattery`` ORM model. + """ + + __tablename__ = 'system_battery' + + id = Column( + Integer, + ForeignKey(PercentSensor.id, ondelete='CASCADE'), + primary_key=True, + ) + + seconds_left = Column(Float) + power_plugged = Column(Boolean) + + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } diff --git a/platypush/message/__init__.py b/platypush/message/__init__.py index 65106da2b4..69c9673f71 100644 --- a/platypush/message/__init__.py +++ b/platypush/message/__init__.py @@ -8,7 +8,7 @@ import logging import inspect import json import time -from typing import Union +from typing import Optional, Union from uuid import UUID _logger = logging.getLogger('platypush') @@ -114,18 +114,19 @@ class Message: self._logger = _logger self._default_log_prefix = '' - def log(self, prefix=''): + def log(self, level: Optional[int] = None, prefix=''): if self.logging_level is None: return # Skip logging log_func = self._logger.info - if self.logging_level == logging.DEBUG: + level = level if level is not None else self.logging_level + if level == logging.DEBUG: log_func = self._logger.debug - elif self.logging_level == logging.WARNING: + elif level == logging.WARNING: log_func = self._logger.warning - elif self.logging_level == logging.ERROR: + elif level == logging.ERROR: log_func = self._logger.error - elif self.logging_level == logging.FATAL: + elif level == logging.FATAL: log_func = self._logger.fatal if not prefix: diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index b470b3dc41..8dda18eb7d 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -64,12 +64,13 @@ class Request(Message): msg = super().parse(msg) args = { 'target': msg.get('target', Config.get('device_id')), - 'action': msg['action'], + 'action': msg.get('action', msg.get('name')), 'args': msg.get('args', {}), 'id': msg['id'] if 'id' in msg else cls._generate_id(), 'timestamp': msg['_timestamp'] if '_timestamp' in msg else time.time(), } + assert args.get('action'), 'No action specified in the request' if 'origin' in msg: args['origin'] = msg['origin'] if 'token' in msg: @@ -100,7 +101,7 @@ class Request(Message): proc = Procedure.build( name=proc_name, requests=proc_config['actions'], - _async=proc_config['_async'], + _async=proc_config.get('_async', False), args=self.args, backend=self.backend, id=self.id, @@ -165,6 +166,11 @@ class Request(Message): context_value = [*context_value] if isinstance(context_value, datetime.date): context_value = context_value.isoformat() + except NameError as e: + logger.warning( + 'Could not expand expression "%s": %s', inner_expr, e + ) + context_value = expr except Exception as e: logger.exception(e) context_value = expr @@ -188,7 +194,13 @@ class Request(Message): response.id = self.id response.target = self.origin response.origin = Config.get('device_id') - response.log() + self._logger.info( + 'Sending response to request[id=%s, action=%s], response_time=%.02fs', + self.id, + self.action, + response.timestamp - self.timestamp, + ) + response.log(level=logging.DEBUG) if self.backend and self.origin: self.backend.send_response(response=response, request=self) @@ -221,7 +233,10 @@ class Request(Message): from platypush.plugins import RunnablePlugin response = None - self.log() + self._logger.info( + 'Executing request[id=%s, action=%s]', self.id, self.action + ) + self.log(level=logging.DEBUG) try: if self.action.startswith('procedure.'): diff --git a/platypush/message/response/__init__.py b/platypush/message/response/__init__.py index f1db626e24..2a16a1cb5d 100644 --- a/platypush/message/response/__init__.py +++ b/platypush/message/response/__init__.py @@ -1,6 +1,7 @@ import json import logging import time +from typing import Optional from platypush.message import Message @@ -99,8 +100,11 @@ class Response(Message): return json.dumps(response_dict, cls=self.Encoder) - def log(self, *args, **kwargs): - self.logging_level = logging.WARNING if self.is_error() else logging.INFO + def log(self, *args, level: Optional[int] = None, **kwargs): + if level is None: + level = logging.WARNING if self.is_error() else logging.INFO + + kwargs['level'] = level super().log(*args, **kwargs) diff --git a/platypush/plugins/alarm/__init__.py b/platypush/plugins/alarm/__init__.py index 7a5c48d3a3..f98ebe53b5 100644 --- a/platypush/plugins/alarm/__init__.py +++ b/platypush/plugins/alarm/__init__.py @@ -179,13 +179,13 @@ class AlarmPlugin(RunnablePlugin, EntityManager): else: # If the alarm record on the db is static, but the alarm is no # longer present in the configuration, then we want to delete it - if alarm.static: + if bool(alarm.static): self._clear_alarm(alarm, session) else: self.alarms[name] = Alarm.from_db( alarm, stop_event=self._should_stop, - media_plugin=alarm.media_plugin or self.media_plugin, + media_plugin=str(alarm.media_plugin) or self.media_plugin, on_change=self._on_alarm_update, ) @@ -215,7 +215,12 @@ class AlarmPlugin(RunnablePlugin, EntityManager): def _clear_alarm(self, alarm: DbAlarm, session: Session): alarm_obj = self.alarms.pop(str(alarm.name), None) if alarm_obj: - alarm_obj.stop() + try: + alarm_obj.stop() + except Exception as e: + self.logger.warning( + f'Error while stopping alarm {alarm.name}: {e}', exc_info=True + ) session.delete(alarm) self._bus.post(EntityDeleteEvent(entity=alarm)) @@ -439,15 +444,15 @@ class AlarmPlugin(RunnablePlugin, EntityManager): when=when or alarm.when, media=media or alarm.media, media_plugin=media_plugin or alarm.media_plugin or self.media_plugin, - media_repeat=media_repeat - if media_repeat is not None - else alarm.media_repeat, + media_repeat=( + media_repeat if media_repeat is not None else alarm.media_repeat + ), actions=actions if actions is not None else (alarm.actions or []), name=new_name or name, enabled=enabled if enabled is not None else alarm.is_enabled(), - audio_volume=audio_volume - if audio_volume is not None - else alarm.audio_volume, + audio_volume=( + audio_volume if audio_volume is not None else alarm.audio_volume + ), snooze_interval=snooze_interval or alarm.snooze_interval, dismiss_interval=dismiss_interval or alarm.dismiss_interval, ).to_dict() diff --git a/platypush/plugins/alarm/_model.py b/platypush/plugins/alarm/_model.py index 81799154b6..5441885d45 100644 --- a/platypush/plugins/alarm/_model.py +++ b/platypush/plugins/alarm/_model.py @@ -274,6 +274,9 @@ class Alarm: if self.audio_volume is not None: self._get_media_plugin().set_volume(self.audio_volume) + if not self.media: + return + audio_thread = threading.Thread(target=thread) audio_thread.start() diff --git a/platypush/plugins/db/__init__.py b/platypush/plugins/db/__init__.py index 5f32258c20..0e4b84a9a1 100644 --- a/platypush/plugins/db/__init__.py +++ b/platypush/plugins/db/__init__.py @@ -40,8 +40,13 @@ class DbPlugin(Plugin): (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) """ - super().__init__() - self.engine_url = engine + from platypush.config import Config + + kwargs.update(Config.get('_db', {})) + super().__init__(*args, **kwargs) + self.engine_url = engine or kwargs.pop('engine', None) + self.args = args + self.kwargs = kwargs self.engine = self.get_engine(engine, *args, **kwargs) def get_engine( @@ -50,6 +55,10 @@ class DbPlugin(Plugin): if engine == self.engine_url and self.engine: return self.engine + if not args: + args = self.args + kwargs = {**self.kwargs, **kwargs} + if engine or not self.engine: if isinstance(engine, Engine): return engine @@ -213,7 +222,7 @@ class DbPlugin(Plugin): query = text(query) if table: - table, engine = self._get_table(table, engine=engine, *args, **kwargs) + table, engine = self._get_table(table, *args, engine=engine, **kwargs) query = table.select() if filter: @@ -240,10 +249,10 @@ class DbPlugin(Plugin): self, table, records, + *args, engine=None, key_columns=None, on_duplicate_update=False, - *args, **kwargs, ): """ @@ -310,7 +319,7 @@ class DbPlugin(Plugin): key_columns = [] engine = self.get_engine(engine, *args, **kwargs) - table, engine = self._get_table(table, engine=engine, *args, **kwargs) + table, engine = self._get_table(table, *args, engine=engine, **kwargs) insert_records = records update_records = [] returned_records = [] @@ -454,7 +463,7 @@ class DbPlugin(Plugin): """ engine = self.get_engine(engine, *args, **kwargs) with engine.connect() as connection: - table, engine = self._get_table(table, engine=engine, *args, **kwargs) + table, engine = self._get_table(table, *args, engine=engine, **kwargs) return self._update(connection, table, records, key_columns) @action @@ -498,7 +507,7 @@ class DbPlugin(Plugin): with engine.connect() as connection: for record in records: - table_, engine = self._get_table(table, engine=engine, *args, **kwargs) + table_, engine = self._get_table(table, *args, engine=engine, **kwargs) delete = table_.delete() for k, v in record.items(): @@ -524,13 +533,19 @@ class DbPlugin(Plugin): with lock, engine.connect() as conn: session_maker = scoped_session( sessionmaker( - expire_on_commit=False, + expire_on_commit=kwargs.get('expire_on_commit', False), autoflush=autoflush, ) ) session_maker.configure(bind=conn) session = session_maker() + + if str(session.connection().engine.url).startswith('sqlite://'): + # SQLite requires foreign_keys to be explicitly enabled + # in order to proper manage cascade deletions + session.execute(text('PRAGMA foreign_keys = ON')) + yield session session.flush() diff --git a/platypush/plugins/entities/__init__.py b/platypush/plugins/entities/__init__.py index a21181d710..b3ef372f3f 100644 --- a/platypush/plugins/entities/__init__.py +++ b/platypush/plugins/entities/__init__.py @@ -4,7 +4,7 @@ from time import time from traceback import format_exception from typing import Optional, Any, Collection, Mapping -from sqlalchemy import or_, text +from sqlalchemy import or_ from sqlalchemy.orm import make_transient, Session from platypush.config import Config @@ -206,11 +206,6 @@ class EntitiesPlugin(Plugin): :return: The payload of the deleted entities. """ with self._get_session(locked=True) as session: - if str(session.connection().engine.url).startswith('sqlite://'): - # SQLite requires foreign_keys to be explicitly enabled - # in order to proper manage cascade deletions - session.execute(text('PRAGMA foreign_keys = ON')) - entities: Collection[Entity] = ( session.query(Entity).filter(Entity.id.in_(entities)).all() ) diff --git a/platypush/plugins/procedures/__init__.py b/platypush/plugins/procedures/__init__.py index d8892e60c3..0f104ed027 100644 --- a/platypush/plugins/procedures/__init__.py +++ b/platypush/plugins/procedures/__init__.py @@ -1,10 +1,27 @@ import json +import re +from contextlib import contextmanager from dataclasses import dataclass -from typing import Callable, Collection, Optional, Union +from multiprocessing import RLock +from random import randint +from typing import ( + Callable, + Collection, + Dict, + Generator, + Iterable, + List, + Optional, + Union, +) + +import yaml +from sqlalchemy.orm import Session from platypush.context import get_plugin from platypush.entities.managers.procedures import ProcedureEntityManager -from platypush.entities.procedures import Procedure +from platypush.entities.procedures import Procedure, ProcedureType +from platypush.message.event.entities import EntityDeleteEvent from platypush.plugins import RunnablePlugin, action from platypush.plugins.db import DbPlugin from platypush.utils import run @@ -23,11 +40,60 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): Utility plugin to run and store procedures as native entities. """ - @action - def exec(self, procedure: str, *args, **kwargs): - return run(f'procedure.{procedure}', *args, **kwargs) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._status_lock = RLock() + + @action + def exec(self, procedure: Union[str, dict], *args, **kwargs): + """ + Execute a procedure. + + :param procedure: Procedure name or definition. If a string is passed, + then the procedure will be looked up by name in the configured + procedures. If a dictionary is passed, then it should be a valid + procedure definition with at least the ``actions`` key. + :param args: Optional arguments to be passed to the procedure. + :param kwargs: Optional arguments to be passed to the procedure. + """ + if isinstance(procedure, str): + return run(f'procedure.{procedure}', *args, **kwargs) + + assert isinstance(procedure, dict), 'Invalid procedure definition' + procedure_name = procedure.get( + 'name', f'procedure_{f"{randint(0, 1 << 32):08x}"}' + ) + + actions = procedure.get('actions') + assert actions and isinstance( + actions, (list, tuple, set) + ), 'Procedure definition should have at least the "actions" key as a list of actions' + + try: + # Create a temporary procedure definition and execute it + self._all_procedures[procedure_name] = { + 'name': procedure_name, + 'type': ProcedureType.CONFIG.value, + 'actions': list(actions), + 'args': procedure.get('args', []), + '_async': False, + } + + kwargs = { + **procedure.get('args', {}), + **kwargs, + } + + return self.exec(procedure_name, *args, **kwargs) + finally: + self._all_procedures.pop(procedure_name, None) + + def _convert_procedure( + self, name: str, proc: Union[dict, Callable, Procedure] + ) -> Procedure: + if isinstance(proc, Procedure): + return proc - def _convert_procedure(self, name: str, proc: Union[dict, Callable]) -> Procedure: metadata = self._serialize_procedure(proc, name=name) return Procedure( id=name, @@ -39,11 +105,19 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): line=metadata.get('line'), args=metadata.get('args', []), actions=metadata.get('actions', []), + meta=metadata.get('meta', {}), ) @action - def status(self, *_, **__): + def status(self, *_, publish: bool = True, **__): """ + :param publish: If set to True (default) then the + :class:`platypush.message.event.entities.EntityUpdateEvent` events + will be published to the bus with the current configured procedures. + Usually this should be set to True, unless you're calling this method + from a context where you first want to retrieve the procedures and + then immediately modify them. In such cases, the published events may + result in race conditions on the entities engine. :return: The serialized configured procedures. Format: .. code-block:: json @@ -59,14 +133,217 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): } """ - self.publish_entities(self._get_wrapped_procedures()) - return self._get_serialized_procedures() + with self._status_lock: + self._sync_db_procedures() + if publish: + self.publish_entities(self._get_wrapped_procedures()) + + return self._get_serialized_procedures() + + @action + def save( + self, + name: str, + actions: Iterable[dict], + args: Optional[Iterable[str]] = None, + old_name: Optional[str] = None, + meta: Optional[dict] = None, + **_, + ): + """ + Save a procedure. + + :param name: Name of the procedure. + :param actions: Definition of the actions to be executed. Format: + + .. code-block:: json + + [ + { + "action": "logger.info", + "args": { + "msg": "Hello, world!" + } + } + ] + + :param args: Optional list of arguments to be passed to the procedure, + as a list of strings with the argument names. + :param old_name: Optional old name of the procedure if it's being + renamed. + :param meta: Optional metadata to be stored with the procedure. Example: + + .. code-block:: json + + { + "icon": { + "class": "fas fa-cogs", + "color": "#00ff00" + } + } + + """ + assert name, 'Procedure name cannot be empty' + assert actions, 'Procedure actions cannot be empty' + + args = args or [] + proc_def = self._all_procedures.get(name, {}) + proc_args = { + 'name': name, + 'type': ProcedureType.DB.value, + 'actions': actions, + 'args': args, + 'meta': ( + meta or (proc_def.get('meta', {}) if isinstance(proc_def, dict) else {}) + ), + } + + def _on_entity_saved(*_, **__): + self._all_procedures[name] = proc_args + + with self._status_lock: + with self._db_session() as session: + if old_name and old_name != name: + try: + self._delete(old_name, session=session) + except AssertionError as e: + self.logger.warning( + 'Error while deleting old procedure: name=%s: %s', + old_name, + e, + ) + + self.publish_entities( + [_ProcedureWrapper(name=name, obj=proc_args)], + callback=_on_entity_saved, + ) + + return self.status(publish=False) + + @action + def delete(self, name: str): + """ + Delete a procedure by name. + + Note that this is only possible for procedures that are stored on the + database. Procedures that are loaded from Python scripts or + configuration files should be removed from the source file. + + :param name: Name of the procedure to be deleted. + """ + with self._db_session() as session: + self._delete(name, session=session) + + self.status() + + @action + def to_yaml(self, procedure: Union[str, dict]) -> str: + """ + Serialize a procedure to YAML. + + This method is useful to export a procedure to a file. + Note that it only works with either YAML-based procedures or + database-stored procedures: Python procedures can't be converted to + YAML. + + :param procedure: Procedure name or definition. If a string is passed, + then the procedure will be looked up by name in the configured + procedures. If a dictionary is passed, then it should be a valid + procedure definition with at least the ``actions`` and ``name`` + keys. + :return: The serialized procedure in YAML format. + """ + if isinstance(procedure, str): + proc = self._all_procedures.get(procedure) + assert proc, f'Procedure {proc} not found' + elif isinstance(procedure, dict): + name = self._normalize_name(procedure.get('name')) + assert name, 'Procedure name cannot be empty' + + actions = procedure.get('actions', []) + assert actions and isinstance( + actions, (list, tuple, set) + ), 'Procedure definition should have at least the "actions" key as a list of actions' + + args = [self._normalize_name(arg) for arg in procedure.get('args', [])] + proc = { + f'procedure.{name}' + + (f'({", ".join(args)})' if args else ''): [ + self._serialize_action(action) for action in actions + ] + } + else: + raise AssertionError( + f'Invalid procedure definition with type {type(procedure)}' + ) + + return yaml.safe_dump(proc, default_flow_style=False, indent=2) + + @staticmethod + def _normalize_name(name: Optional[str]) -> str: + return re.sub(r'[^\w.]+', '_', (name or '').strip(' .')) + + @classmethod + def _serialize_action(cls, data: Union[Iterable, Dict]) -> Union[Dict, List, str]: + if isinstance(data, dict): + name = data.get('action', data.get('name')) + if name: + return { + 'action': name, + **({'args': data['args']} if data.get('args') else {}), + } + + return { + k: ( + cls._serialize_action(v) + if isinstance(v, (dict, list, tuple)) + else v + ) + for k, v in data.items() + } + elif isinstance(data, str): + return data + else: + return [cls._serialize_action(item) for item in data if item is not None] + + @contextmanager + def _db_session(self) -> Generator[Session, None, None]: + db: Optional[DbPlugin] = get_plugin(DbPlugin) + assert db, 'No database plugin configured' + with db.get_session(locked=True) as session: + assert isinstance(session, Session) + yield session + + if session.is_active: + session.commit() + else: + session.rollback() + + def _delete(self, name: str, session: Session): + assert name, 'Procedure name cannot be empty' + proc_row: Procedure = ( + session.query(Procedure).filter(Procedure.name == name).first() + ) + + assert proc_row, f'Procedure {name} not found in the database' + assert proc_row.procedure_type == ProcedureType.DB.value, ( # type: ignore[attr-defined] + f'Procedure {name} is not stored in the database, ' + f'it should be removed from the source file' + ) + + session.delete(proc_row) + self._all_procedures.pop(name, None) + self._bus.post(EntityDeleteEvent(plugin=self, entity=proc_row)) def transform_entities( self, entities: Collection[_ProcedureWrapper], **_ ) -> Collection[Procedure]: return [ - self._convert_procedure(name=proc.name, proc=proc.obj) for proc in entities + self._convert_procedure( + name=proc.name, + proc=proc if isinstance(proc, Procedure) else proc.obj, + ) + for proc in entities ] def _get_wrapped_procedures(self) -> Collection[_ProcedureWrapper]: @@ -76,22 +353,40 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager): ] def _sync_db_procedures(self): - cur_proc_names = set(self._all_procedures.keys()) - db: Optional[DbPlugin] = get_plugin(DbPlugin) - assert db, 'No database plugin configured' + with self._status_lock: + cur_proc_names = set(self._all_procedures.keys()) + with self._db_session() as session: + saved_procs = { + str(proc.name): proc for proc in session.query(Procedure).all() + } - with db.get_session( - autoflush=False, autocommit=False, expire_on_commit=False - ) as session: - procs_to_remove = ( - session.query(Procedure) - .filter(Procedure.name.not_in(cur_proc_names)) - .all() - ) + procs_to_remove = [ + proc + for name, proc in saved_procs.items() + if name not in cur_proc_names + and proc.procedure_type != ProcedureType.DB.value # type: ignore[attr-defined] + ] - for proc in procs_to_remove: - self.logger.info('Removing stale procedure record for %s', proc.name) - session.delete(proc) + for proc in procs_to_remove: + self.logger.info( + 'Removing stale procedure record for %s', proc.name + ) + session.delete(proc) + + procs_to_add = [ + proc + for proc in saved_procs.values() + if proc.procedure_type == ProcedureType.DB.value # type: ignore[attr-defined] + ] + + for proc in procs_to_add: + self._all_procedures[str(proc.name)] = { + 'type': proc.procedure_type, + 'name': proc.name, + 'args': proc.args, + 'actions': proc.actions, + 'meta': proc.meta, + } @staticmethod def _serialize_procedure( diff --git a/platypush/plugins/procedures/_serialize.py b/platypush/plugins/procedures/_serialize.py index aa9a770316..5feb7f9f3e 100644 --- a/platypush/plugins/procedures/_serialize.py +++ b/platypush/plugins/procedures/_serialize.py @@ -8,6 +8,8 @@ class ProcedureEncoder(json.JSONEncoder): """ def default(self, o): + from platypush.entities.procedures import ProcedureType + if callable(o): return { 'type': 'python', @@ -21,4 +23,7 @@ class ProcedureEncoder(json.JSONEncoder): ], } + if isinstance(o, ProcedureType): + return o.value + return super().default(o) diff --git a/platypush/plugins/system/__init__.py b/platypush/plugins/system/__init__.py index 924149b18d..5156681755 100644 --- a/platypush/plugins/system/__init__.py +++ b/platypush/plugins/system/__init__.py @@ -634,13 +634,15 @@ class SystemPlugin(SensorPlugin, EntityManager): if fan.get('id') and fan.get('label') ], *[ - SystemBattery( - id='system:battery', - name='Battery', - **battery, + ( + SystemBattery( + id='system:battery', + name='Battery', + **battery, + ) + if battery + else () ) - if battery - else () ], ] diff --git a/platypush/procedure/__init__.py b/platypush/procedure/__init__.py index 56cdc8f130..0e28b5f1a9 100644 --- a/platypush/procedure/__init__.py +++ b/platypush/procedure/__init__.py @@ -1,10 +1,12 @@ import enum import logging import re +from copy import deepcopy +from dataclasses import dataclass, field from functools import wraps from queue import LifoQueue -from typing import Optional +from typing import Any, Dict, Iterable, List, Optional from ..common import exec_wrapper from ..config import Config @@ -14,7 +16,7 @@ from ..message.response import Response logger = logging.getLogger('platypush') -class Statement(enum.Enum): +class StatementType(enum.Enum): """ Enumerates the possible statements in a procedure. """ @@ -22,6 +24,68 @@ class Statement(enum.Enum): BREAK = 'break' CONTINUE = 'continue' RETURN = 'return' + SET = 'set' + + +@dataclass +class Statement: + """ + Models a statement in a procedure. + """ + + type: StatementType + argument: Optional[Any] = None + + @classmethod + def build(cls, statement: str): + """ + Builds a statement from a string. + """ + + m = re.match(r'\s*return\s*(.*)\s*', statement, re.IGNORECASE) + if m: + return ReturnStatement(argument=m.group(1)) + + return cls(StatementType(statement.lower())) + + def run(self, *_, **__) -> Optional[Any]: + """ + Executes the statement. + """ + + +@dataclass +class ReturnStatement(Statement): + """ + Models a return statement in a procedure. + """ + + type: StatementType = StatementType.RETURN + + def run(self, *_, **context) -> Any: + return Response( + output=Request.expand_value_from_context( + self.argument, **_update_context(context) + ) + ) + + +@dataclass +class SetStatement(Statement): + """ + Models a set variable statement in a procedure. + """ + + type: StatementType = StatementType.SET + vars: dict = field(default_factory=dict) + + def run(self, *_, **context): + vars = deepcopy(self.vars) # pylint: disable=redefined-builtin + for k, v in vars.items(): + vars[k] = Request.expand_value_from_context(v, **context) + + context.update(vars) + return Response(output=vars) class Procedure: @@ -55,7 +119,6 @@ class Procedure: requests, args=None, backend=None, - id=None, # pylint: disable=redefined-builtin procedure_class=None, **kwargs, ): @@ -66,11 +129,32 @@ class Procedure: if_config = LifoQueue() procedure_class = procedure_class or cls key = None + kwargs.pop('id', None) for request_config in requests: # Check if it's a break/continue/return statement if isinstance(request_config, str): - reqs.append(Statement(request_config)) + cls._flush_if_statements(reqs, if_config) + reqs.append(Statement.build(request_config)) + continue + + # Check if it's a return statement with a value + if ( + len(request_config.keys()) == 1 + and list(request_config.keys())[0] == StatementType.RETURN.value + ): + cls._flush_if_statements(reqs, if_config) + reqs.append( + ReturnStatement(argument=request_config[StatementType.RETURN.value]) + ) + continue + + # Check if it's a variable set statement + if (len(request_config.keys()) == 1) and ( + list(request_config.keys())[0] == StatementType.SET.value + ): + cls._flush_if_statements(reqs, if_config) + reqs.append(SetStatement(vars=request_config[StatementType.SET.value])) continue # Check if this request is an if-else @@ -79,6 +163,7 @@ class Procedure: m = re.match(r'\s*(if)\s+\${(.*)}\s*', key) if m: + cls._flush_if_statements(reqs, if_config) if_count += 1 if_name = f'{name}__if_{if_count}' condition = m.group(2) @@ -91,7 +176,6 @@ class Procedure: 'condition': condition, 'else_branch': [], 'backend': backend, - 'id': id, } ) @@ -132,7 +216,6 @@ class Procedure: _async=_async, requests=request_config[key], backend=backend, - id=id, iterator_name=iterator_name, iterable=iterable, ) @@ -156,23 +239,19 @@ class Procedure: requests=request_config[key], condition=condition, backend=backend, - id=id, ) reqs.append(loop) continue request_config['origin'] = Config.get('device_id') - request_config['id'] = id if 'target' not in request_config: request_config['target'] = request_config['origin'] request = Request.build(request_config) reqs.append(request) - while not if_config.empty(): - pending_if = if_config.get() - reqs.append(IfProcedure.build(**pending_if)) + cls._flush_if_statements(reqs, if_config) return procedure_class( name=name, @@ -184,84 +263,101 @@ class Procedure: ) @staticmethod - def _find_nearest_loop(stack): - for proc in stack[::-1]: - if isinstance(proc, LoopProcedure): - return proc - - raise AssertionError('break/continue statement found outside of a loop') + def _flush_if_statements(requests: List, if_config: LifoQueue): + while not if_config.empty(): + pending_if = if_config.get() + requests.append(IfProcedure.build(**pending_if)) # pylint: disable=too-many-branches,too-many-statements - def execute(self, n_tries=1, __stack__=None, **context): + def execute( + self, + n_tries: int = 1, + __stack__: Optional[Iterable] = None, + new_context: Optional[Dict[str, Any]] = None, + **context, + ): """ Execute the requests in the procedure. :param n_tries: Number of tries in case of failure before raising a RuntimeError. """ - if not __stack__: - __stack__ = [self] - else: - __stack__.append(self) + __stack__ = (self,) if not __stack__ else (self, *__stack__) + new_context = new_context or {} if self.args: args = self.args.copy() for k, v in args.items(): - v = Request.expand_value_from_context(v, **context) - args[k] = v - context[k] = v + args[k] = context[k] = Request.expand_value_from_context(v, **context) logger.info('Executing procedure %s with arguments %s', self.name, args) else: logger.info('Executing procedure %s', self.name) response = Response() token = Config.get('token') + context = _update_context(context) + locals().update(context) + # pylint: disable=too-many-nested-blocks for request in self.requests: if callable(request): response = request(**context) continue + context['_async'] = self._async + context['n_tries'] = n_tries + context['__stack__'] = __stack__ + context['new_context'] = new_context + if isinstance(request, Statement): - if request == Statement.RETURN: + if isinstance(request, ReturnStatement): + response = request.run(**context) self._should_return = True for proc in __stack__: proc._should_return = True # pylint: disable=protected-access + break - if request in [Statement.BREAK, Statement.CONTINUE]: - loop = self._find_nearest_loop(__stack__) - if request == Statement.BREAK: - loop._should_break = True # pylint: disable=protected-access - else: - loop._should_continue = True # pylint: disable=protected-access + if isinstance(request, SetStatement): + rs: dict = request.run(**context).output # type: ignore + context.update(rs) + new_context.update(rs) + locals().update(rs) + continue + + if request.type in [StatementType.BREAK, StatementType.CONTINUE]: + for proc in __stack__: + if isinstance(proc, LoopProcedure): + if request.type == StatementType.BREAK: + setattr(proc, '_should_break', True) # noqa: B010 + else: + setattr(proc, '_should_continue', True) # noqa: B010 + break + + proc._should_return = True # pylint: disable=protected-access + break should_continue = getattr(self, '_should_continue', False) should_break = getattr(self, '_should_break', False) - if isinstance(self, LoopProcedure) and (should_continue or should_break): - if should_continue: - self._should_continue = ( # pylint: disable=attribute-defined-outside-init - False - ) - + if self._should_return or should_continue or should_break: break if token and not isinstance(request, Statement): request.token = token - context['_async'] = self._async - context['n_tries'] = n_tries exec_ = getattr(request, 'execute', None) if callable(exec_): - response = exec_(__stack__=__stack__, **context) + response = exec_(**context) + context.update(context.get('new_context', {})) if not self._async and response: if isinstance(response.output, dict): - for k, v in response.output.items(): - context[k] = v + context.update(response.output) context['output'] = response.output context['errors'] = response.errors + new_context.update(context) + locals().update(context) if self._should_return: break @@ -282,10 +378,8 @@ class LoopProcedure(Procedure): Base class while and for/fork loops. """ - def __init__(self, name, requests, _async=False, args=None, backend=None): - super().__init__( - name=name, _async=_async, requests=requests, args=args, backend=backend - ) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) self._should_break = False self._should_continue = False @@ -330,6 +424,9 @@ class ForProcedure(LoopProcedure): # pylint: disable=eval-used def execute(self, *_, **context): + ctx = _update_context(context) + locals().update(ctx) + try: iterable = eval(self.iterable) assert hasattr( @@ -337,11 +434,18 @@ class ForProcedure(LoopProcedure): ), f'Object of type {type(iterable)} is not iterable: {iterable}' except Exception as e: logger.debug('Iterable %s expansion error: %s', self.iterable, e) - iterable = Request.expand_value_from_context(self.iterable, **context) + iterable = Request.expand_value_from_context(self.iterable, **ctx) response = Response() for item in iterable: + ctx[self.iterator_name] = item + response = super().execute(**ctx) + ctx.update(ctx.get('new_context', {})) + + if response.output and isinstance(response.output, dict): + ctx = _update_context(ctx, **response.output) + if self._should_return: logger.info('Returning from %s', self.name) break @@ -356,9 +460,6 @@ class ForProcedure(LoopProcedure): logger.info('Breaking loop %s', self.name) break - context[self.iterator_name] = item - response = super().execute(**context) - return response @@ -395,41 +496,23 @@ class WhileProcedure(LoopProcedure): ) self.condition = condition - @staticmethod - def _get_context(**context): - for k, v in context.items(): - try: - context[k] = eval(v) # pylint: disable=eval-used - except Exception as e: - logger.debug('Evaluation error for %s=%s: %s', k, v, e) - if isinstance(v, str): - try: - context[k] = eval( # pylint: disable=eval-used - '"' + re.sub(r'(^|[^\\])"', '\1\\"', v) + '"' - ) - except Exception as ee: - logger.warning( - 'Could not parse value for context variable %s=%s: %s', - k, - v, - ee, - ) - logger.warning('Context: %s', context) - logger.exception(e) - - return context - def execute(self, *_, **context): response = Response() - context = self._get_context(**context) - for k, v in context.items(): - locals()[k] = v + ctx = _update_context(context) + locals().update(ctx) while True: condition_true = eval(self.condition) # pylint: disable=eval-used if not condition_true: break + response = super().execute(**ctx) + ctx.update(ctx.get('new_context', {})) + if response.output and isinstance(response.output, dict): + _update_context(ctx, **response.output) + + locals().update(ctx) + if self._should_return: logger.info('Returning from %s', self.name) break @@ -444,13 +527,6 @@ class WhileProcedure(LoopProcedure): logger.info('Breaking loop %s', self.name) break - response = super().execute(**context) - - if response.output and isinstance(response.output, dict): - new_context = self._get_context(**response.output) - for k, v in new_context.items(): - locals()[k] = v - return response @@ -544,20 +620,28 @@ class IfProcedure(Procedure): ) def execute(self, *_, **context): - for k, v in context.items(): - locals()[k] = v - + ctx = _update_context(context) + locals().update(ctx) condition_true = eval(self.condition) # pylint: disable=eval-used response = Response() if condition_true: - response = super().execute(**context) + response = super().execute(**ctx) elif self.else_branch: - response = self.else_branch.execute(**context) + response = self.else_branch.execute(**ctx) return response +def _update_context(context: Optional[Dict[str, Any]] = None, **kwargs): + ctx = context or {} + ctx = {**ctx.get('context', {}), **ctx, **kwargs} + for k, v in ctx.items(): + ctx[k] = Request.expand_value_from_context(v, **ctx) + + return ctx + + def procedure(name_or_func: Optional[str] = None, *upper_args, **upper_kwargs): name = name_or_func if isinstance(name_or_func, str) else None