Merge pull request '[Backend] Expose procedures as entities' (#426) from 341/procedure-entities into master

Reviewed-on: platypush/platypush#426
This commit is contained in:
Fabio Manganiello 2024-09-23 03:45:30 +02:00
commit 62737b5a95
17 changed files with 851 additions and 398 deletions

View file

@ -6,9 +6,18 @@
ssh-keyscan github.com >> ~/.ssh/known_hosts 2>/dev/null
# Clone the repository
branch=$(git rev-parse --abbrev-ref HEAD)
if [ -z "${branch}" ]; then
echo "No branch checked out"
exit 1
fi
git remote add github git@github.com:/blacklight/platypush.git
git pull --rebase github "$(git branch | head -1 | awk '{print $2}')" || echo "No such branch on Github"
if (( "$branch" == "master" )); then
git pull --rebase github "${branch}" || echo "No such branch on Github"
fi
# Push the changes to the GitHub mirror
git push --all -v github
git push -f --all -v github
git push --tags -v github

View file

@ -1,5 +1,44 @@
# Changelog
## [Unreleased]
- [[#333](https://git.platypush.tech/platypush/platypush/issues/333)]: new file
browser UI/component. It includes custom MIME type support, a file editor
with syntax highlight, file download and file upload.
- [[#341](https://git.platypush.tech/platypush/platypush/issues/341)]:
procedures are now native entities that can be managed from the entities panel.
A new versatile procedure editor has also been added, with support for nested
blocks, conditions, loops, variables, context autocomplete, and more.
- [`procedure`]: Added the following features to YAML/structured procedures:
- `set`: to set variables whose scope is limited to the procedure / code
block where they are created. `variable.set` is useful to permanently
store variables on the db, `variable.mset` is useful to set temporary
global variables in memory through Redis, but sometimes you may just want
to assign a value to a variable that only needs to live within a procedure,
event hook or cron.
```yaml
- set:
foo: bar
temperature: ${output.get('temperature')}
```
- `return` can now return values too when invoked within a procedure:
```yaml
- return: something
# Or
- return: "Result: ${output.get('response')}"
```
- The default logging format is now much more compact. The full body of events
and requests is no longer included by default in `info` mode - instead, a
summary with the message type, ID and response time is logged. The full
payloads can still be logged by enabling `debug` logs through e.g. `-v`.
## [1.2.3]
- [[#422](https://git.platypush.tech/platypush/platypush/issues/422)]: adapted

View file

@ -365,7 +365,13 @@ class Application:
elif isinstance(msg, Response):
msg.log()
elif isinstance(msg, Event):
msg.log()
log.info(
'Received event: %s.%s[id=%s]',
msg.__class__.__module__,
msg.__class__.__name__,
msg.id,
)
msg.log(level=logging.DEBUG)
self.event_processor.process_event(msg)
return _f

View file

@ -179,6 +179,8 @@ class Config:
self._config['logging'] = logging_config
def _init_db(self, db: Optional[str] = None):
self._config['_db'] = self._config.get('db', {})
# If the db connection string is passed as an argument, use it
if db:
self._config['db'] = {

View file

@ -1,8 +1,9 @@
import logging
from enum import Enum
from sqlalchemy import (
Column,
Enum,
Enum as DbEnum,
ForeignKey,
Integer,
JSON,
@ -16,6 +17,12 @@ from . import Entity
logger = logging.getLogger(__name__)
class ProcedureType(Enum):
PYTHON = 'python'
CONFIG = 'config'
DB = 'db'
if not is_defined('procedure'):
class Procedure(Entity):
@ -30,7 +37,13 @@ if not is_defined('procedure'):
)
args = Column(JSON, nullable=False, default=[])
procedure_type = Column(
Enum('python', 'config', name='procedure_type'), nullable=False
DbEnum(
*[m.value for m in ProcedureType.__members__.values()],
name='procedure_type',
create_constraint=True,
validate_strings=True,
),
nullable=False,
)
module = Column(String)
source = Column(String)

View file

@ -1,25 +1,19 @@
from sqlalchemy import Boolean, Column, Float, ForeignKey, Integer, JSON, String
from platypush.common.db import is_defined
from . import Entity
from .devices import Device
from .sensors import NumericSensor, PercentSensor
from .temperature import TemperatureSensor
if not is_defined('cpu'):
class Cpu(Entity):
class Cpu(Entity):
"""
``CPU`` ORM (container) model.
"""
__tablename__ = 'cpu'
id = Column(
Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True
)
id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True)
percent = Column(Float)
@ -29,18 +23,14 @@ if not is_defined('cpu'):
}
if not is_defined('cpu_info'):
class CpuInfo(Entity):
class CpuInfo(Entity):
"""
``CpuInfo`` ORM model.
"""
__tablename__ = 'cpu_info'
id = Column(
Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True
)
id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True)
architecture = Column(String)
bits = Column(Integer)
@ -61,18 +51,14 @@ if not is_defined('cpu_info'):
}
if not is_defined('cpu_times'):
class CpuTimes(Entity):
class CpuTimes(Entity):
"""
``CpuTimes`` ORM (container) model.
"""
__tablename__ = 'cpu_times'
id = Column(
Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True
)
id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True)
__table_args__ = {'extend_existing': True}
__mapper_args__ = {
@ -80,18 +66,14 @@ if not is_defined('cpu_times'):
}
if not is_defined('cpu_stats'):
class CpuStats(Entity):
class CpuStats(Entity):
"""
``CpuStats`` ORM (container) model.
"""
__tablename__ = 'cpu_stats'
id = Column(
Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True
)
id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True)
__table_args__ = {'extend_existing': True}
__mapper_args__ = {
@ -99,18 +81,14 @@ if not is_defined('cpu_stats'):
}
if not is_defined('memory_stats'):
class MemoryStats(Entity):
class MemoryStats(Entity):
"""
``MemoryStats`` ORM model.
"""
__tablename__ = 'memory_stats'
id = Column(
Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True
)
id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True)
total = Column(Integer)
available = Column(Integer)
@ -129,18 +107,14 @@ if not is_defined('memory_stats'):
}
if not is_defined('swap_stats'):
class SwapStats(Entity):
class SwapStats(Entity):
"""
``SwapStats`` ORM model.
"""
__tablename__ = 'swap_stats'
id = Column(
Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True
)
id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True)
total = Column(Integer)
used = Column(Integer)
@ -153,18 +127,14 @@ if not is_defined('swap_stats'):
}
if not is_defined('disk'):
class Disk(Entity):
class Disk(Entity):
"""
``Disk`` ORM model.
"""
__tablename__ = 'disk'
id = Column(
Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True
)
id = Column(Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True)
mountpoint = Column(String)
fstype = Column(String)
@ -187,18 +157,14 @@ if not is_defined('disk'):
}
if not is_defined('network_interface'):
class NetworkInterface(Device):
class NetworkInterface(Device):
"""
``NetworkInterface`` ORM model.
"""
__tablename__ = 'network_interface'
id = Column(
Integer, ForeignKey(Device.id, ondelete='CASCADE'), primary_key=True
)
id = Column(Integer, ForeignKey(Device.id, ondelete='CASCADE'), primary_key=True)
bytes_sent = Column(Integer)
bytes_recv = Column(Integer)
@ -220,9 +186,7 @@ if not is_defined('network_interface'):
}
if not is_defined('system_temperature'):
class SystemTemperature(TemperatureSensor):
class SystemTemperature(TemperatureSensor):
"""
Extends the ``TemperatureSensor``.
"""
@ -244,9 +208,7 @@ if not is_defined('system_temperature'):
}
if not is_defined('system_fan'):
class SystemFan(NumericSensor):
class SystemFan(NumericSensor):
"""
``SystemFan`` ORM model.
"""
@ -265,9 +227,7 @@ if not is_defined('system_fan'):
}
if not is_defined('system_battery'):
class SystemBattery(PercentSensor):
class SystemBattery(PercentSensor):
"""
``SystemBattery`` ORM model.
"""

View file

@ -8,7 +8,7 @@ import logging
import inspect
import json
import time
from typing import Union
from typing import Optional, Union
from uuid import UUID
_logger = logging.getLogger('platypush')
@ -114,18 +114,19 @@ class Message:
self._logger = _logger
self._default_log_prefix = ''
def log(self, prefix=''):
def log(self, level: Optional[int] = None, prefix=''):
if self.logging_level is None:
return # Skip logging
log_func = self._logger.info
if self.logging_level == logging.DEBUG:
level = level if level is not None else self.logging_level
if level == logging.DEBUG:
log_func = self._logger.debug
elif self.logging_level == logging.WARNING:
elif level == logging.WARNING:
log_func = self._logger.warning
elif self.logging_level == logging.ERROR:
elif level == logging.ERROR:
log_func = self._logger.error
elif self.logging_level == logging.FATAL:
elif level == logging.FATAL:
log_func = self._logger.fatal
if not prefix:

View file

@ -64,12 +64,13 @@ class Request(Message):
msg = super().parse(msg)
args = {
'target': msg.get('target', Config.get('device_id')),
'action': msg['action'],
'action': msg.get('action', msg.get('name')),
'args': msg.get('args', {}),
'id': msg['id'] if 'id' in msg else cls._generate_id(),
'timestamp': msg['_timestamp'] if '_timestamp' in msg else time.time(),
}
assert args.get('action'), 'No action specified in the request'
if 'origin' in msg:
args['origin'] = msg['origin']
if 'token' in msg:
@ -100,7 +101,7 @@ class Request(Message):
proc = Procedure.build(
name=proc_name,
requests=proc_config['actions'],
_async=proc_config['_async'],
_async=proc_config.get('_async', False),
args=self.args,
backend=self.backend,
id=self.id,
@ -165,6 +166,11 @@ class Request(Message):
context_value = [*context_value]
if isinstance(context_value, datetime.date):
context_value = context_value.isoformat()
except NameError as e:
logger.warning(
'Could not expand expression "%s": %s', inner_expr, e
)
context_value = expr
except Exception as e:
logger.exception(e)
context_value = expr
@ -188,7 +194,13 @@ class Request(Message):
response.id = self.id
response.target = self.origin
response.origin = Config.get('device_id')
response.log()
self._logger.info(
'Sending response to request[id=%s, action=%s], response_time=%.02fs',
self.id,
self.action,
response.timestamp - self.timestamp,
)
response.log(level=logging.DEBUG)
if self.backend and self.origin:
self.backend.send_response(response=response, request=self)
@ -221,7 +233,10 @@ class Request(Message):
from platypush.plugins import RunnablePlugin
response = None
self.log()
self._logger.info(
'Executing request[id=%s, action=%s]', self.id, self.action
)
self.log(level=logging.DEBUG)
try:
if self.action.startswith('procedure.'):

View file

@ -1,6 +1,7 @@
import json
import logging
import time
from typing import Optional
from platypush.message import Message
@ -99,8 +100,11 @@ class Response(Message):
return json.dumps(response_dict, cls=self.Encoder)
def log(self, *args, **kwargs):
self.logging_level = logging.WARNING if self.is_error() else logging.INFO
def log(self, *args, level: Optional[int] = None, **kwargs):
if level is None:
level = logging.WARNING if self.is_error() else logging.INFO
kwargs['level'] = level
super().log(*args, **kwargs)

View file

@ -179,13 +179,13 @@ class AlarmPlugin(RunnablePlugin, EntityManager):
else:
# If the alarm record on the db is static, but the alarm is no
# longer present in the configuration, then we want to delete it
if alarm.static:
if bool(alarm.static):
self._clear_alarm(alarm, session)
else:
self.alarms[name] = Alarm.from_db(
alarm,
stop_event=self._should_stop,
media_plugin=alarm.media_plugin or self.media_plugin,
media_plugin=str(alarm.media_plugin) or self.media_plugin,
on_change=self._on_alarm_update,
)
@ -215,7 +215,12 @@ class AlarmPlugin(RunnablePlugin, EntityManager):
def _clear_alarm(self, alarm: DbAlarm, session: Session):
alarm_obj = self.alarms.pop(str(alarm.name), None)
if alarm_obj:
try:
alarm_obj.stop()
except Exception as e:
self.logger.warning(
f'Error while stopping alarm {alarm.name}: {e}', exc_info=True
)
session.delete(alarm)
self._bus.post(EntityDeleteEvent(entity=alarm))
@ -439,15 +444,15 @@ class AlarmPlugin(RunnablePlugin, EntityManager):
when=when or alarm.when,
media=media or alarm.media,
media_plugin=media_plugin or alarm.media_plugin or self.media_plugin,
media_repeat=media_repeat
if media_repeat is not None
else alarm.media_repeat,
media_repeat=(
media_repeat if media_repeat is not None else alarm.media_repeat
),
actions=actions if actions is not None else (alarm.actions or []),
name=new_name or name,
enabled=enabled if enabled is not None else alarm.is_enabled(),
audio_volume=audio_volume
if audio_volume is not None
else alarm.audio_volume,
audio_volume=(
audio_volume if audio_volume is not None else alarm.audio_volume
),
snooze_interval=snooze_interval or alarm.snooze_interval,
dismiss_interval=dismiss_interval or alarm.dismiss_interval,
).to_dict()

View file

@ -274,6 +274,9 @@ class Alarm:
if self.audio_volume is not None:
self._get_media_plugin().set_volume(self.audio_volume)
if not self.media:
return
audio_thread = threading.Thread(target=thread)
audio_thread.start()

View file

@ -40,8 +40,13 @@ class DbPlugin(Plugin):
(see https:///docs.sqlalchemy.org/en/latest/core/engines.html)
"""
super().__init__()
self.engine_url = engine
from platypush.config import Config
kwargs.update(Config.get('_db', {}))
super().__init__(*args, **kwargs)
self.engine_url = engine or kwargs.pop('engine', None)
self.args = args
self.kwargs = kwargs
self.engine = self.get_engine(engine, *args, **kwargs)
def get_engine(
@ -50,6 +55,10 @@ class DbPlugin(Plugin):
if engine == self.engine_url and self.engine:
return self.engine
if not args:
args = self.args
kwargs = {**self.kwargs, **kwargs}
if engine or not self.engine:
if isinstance(engine, Engine):
return engine
@ -213,7 +222,7 @@ class DbPlugin(Plugin):
query = text(query)
if table:
table, engine = self._get_table(table, engine=engine, *args, **kwargs)
table, engine = self._get_table(table, *args, engine=engine, **kwargs)
query = table.select()
if filter:
@ -240,10 +249,10 @@ class DbPlugin(Plugin):
self,
table,
records,
*args,
engine=None,
key_columns=None,
on_duplicate_update=False,
*args,
**kwargs,
):
"""
@ -310,7 +319,7 @@ class DbPlugin(Plugin):
key_columns = []
engine = self.get_engine(engine, *args, **kwargs)
table, engine = self._get_table(table, engine=engine, *args, **kwargs)
table, engine = self._get_table(table, *args, engine=engine, **kwargs)
insert_records = records
update_records = []
returned_records = []
@ -454,7 +463,7 @@ class DbPlugin(Plugin):
"""
engine = self.get_engine(engine, *args, **kwargs)
with engine.connect() as connection:
table, engine = self._get_table(table, engine=engine, *args, **kwargs)
table, engine = self._get_table(table, *args, engine=engine, **kwargs)
return self._update(connection, table, records, key_columns)
@action
@ -498,7 +507,7 @@ class DbPlugin(Plugin):
with engine.connect() as connection:
for record in records:
table_, engine = self._get_table(table, engine=engine, *args, **kwargs)
table_, engine = self._get_table(table, *args, engine=engine, **kwargs)
delete = table_.delete()
for k, v in record.items():
@ -524,13 +533,19 @@ class DbPlugin(Plugin):
with lock, engine.connect() as conn:
session_maker = scoped_session(
sessionmaker(
expire_on_commit=False,
expire_on_commit=kwargs.get('expire_on_commit', False),
autoflush=autoflush,
)
)
session_maker.configure(bind=conn)
session = session_maker()
if str(session.connection().engine.url).startswith('sqlite://'):
# SQLite requires foreign_keys to be explicitly enabled
# in order to proper manage cascade deletions
session.execute(text('PRAGMA foreign_keys = ON'))
yield session
session.flush()

View file

@ -4,7 +4,7 @@ from time import time
from traceback import format_exception
from typing import Optional, Any, Collection, Mapping
from sqlalchemy import or_, text
from sqlalchemy import or_
from sqlalchemy.orm import make_transient, Session
from platypush.config import Config
@ -206,11 +206,6 @@ class EntitiesPlugin(Plugin):
:return: The payload of the deleted entities.
"""
with self._get_session(locked=True) as session:
if str(session.connection().engine.url).startswith('sqlite://'):
# SQLite requires foreign_keys to be explicitly enabled
# in order to proper manage cascade deletions
session.execute(text('PRAGMA foreign_keys = ON'))
entities: Collection[Entity] = (
session.query(Entity).filter(Entity.id.in_(entities)).all()
)

View file

@ -1,10 +1,27 @@
import json
import re
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Callable, Collection, Optional, Union
from multiprocessing import RLock
from random import randint
from typing import (
Callable,
Collection,
Dict,
Generator,
Iterable,
List,
Optional,
Union,
)
import yaml
from sqlalchemy.orm import Session
from platypush.context import get_plugin
from platypush.entities.managers.procedures import ProcedureEntityManager
from platypush.entities.procedures import Procedure
from platypush.entities.procedures import Procedure, ProcedureType
from platypush.message.event.entities import EntityDeleteEvent
from platypush.plugins import RunnablePlugin, action
from platypush.plugins.db import DbPlugin
from platypush.utils import run
@ -23,11 +40,60 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager):
Utility plugin to run and store procedures as native entities.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._status_lock = RLock()
@action
def exec(self, procedure: str, *args, **kwargs):
def exec(self, procedure: Union[str, dict], *args, **kwargs):
"""
Execute a procedure.
:param procedure: Procedure name or definition. If a string is passed,
then the procedure will be looked up by name in the configured
procedures. If a dictionary is passed, then it should be a valid
procedure definition with at least the ``actions`` key.
:param args: Optional arguments to be passed to the procedure.
:param kwargs: Optional arguments to be passed to the procedure.
"""
if isinstance(procedure, str):
return run(f'procedure.{procedure}', *args, **kwargs)
def _convert_procedure(self, name: str, proc: Union[dict, Callable]) -> Procedure:
assert isinstance(procedure, dict), 'Invalid procedure definition'
procedure_name = procedure.get(
'name', f'procedure_{f"{randint(0, 1 << 32):08x}"}'
)
actions = procedure.get('actions')
assert actions and isinstance(
actions, (list, tuple, set)
), 'Procedure definition should have at least the "actions" key as a list of actions'
try:
# Create a temporary procedure definition and execute it
self._all_procedures[procedure_name] = {
'name': procedure_name,
'type': ProcedureType.CONFIG.value,
'actions': list(actions),
'args': procedure.get('args', []),
'_async': False,
}
kwargs = {
**procedure.get('args', {}),
**kwargs,
}
return self.exec(procedure_name, *args, **kwargs)
finally:
self._all_procedures.pop(procedure_name, None)
def _convert_procedure(
self, name: str, proc: Union[dict, Callable, Procedure]
) -> Procedure:
if isinstance(proc, Procedure):
return proc
metadata = self._serialize_procedure(proc, name=name)
return Procedure(
id=name,
@ -39,11 +105,19 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager):
line=metadata.get('line'),
args=metadata.get('args', []),
actions=metadata.get('actions', []),
meta=metadata.get('meta', {}),
)
@action
def status(self, *_, **__):
def status(self, *_, publish: bool = True, **__):
"""
:param publish: If set to True (default) then the
:class:`platypush.message.event.entities.EntityUpdateEvent` events
will be published to the bus with the current configured procedures.
Usually this should be set to True, unless you're calling this method
from a context where you first want to retrieve the procedures and
then immediately modify them. In such cases, the published events may
result in race conditions on the entities engine.
:return: The serialized configured procedures. Format:
.. code-block:: json
@ -59,14 +133,217 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager):
}
"""
with self._status_lock:
self._sync_db_procedures()
if publish:
self.publish_entities(self._get_wrapped_procedures())
return self._get_serialized_procedures()
@action
def save(
self,
name: str,
actions: Iterable[dict],
args: Optional[Iterable[str]] = None,
old_name: Optional[str] = None,
meta: Optional[dict] = None,
**_,
):
"""
Save a procedure.
:param name: Name of the procedure.
:param actions: Definition of the actions to be executed. Format:
.. code-block:: json
[
{
"action": "logger.info",
"args": {
"msg": "Hello, world!"
}
}
]
:param args: Optional list of arguments to be passed to the procedure,
as a list of strings with the argument names.
:param old_name: Optional old name of the procedure if it's being
renamed.
:param meta: Optional metadata to be stored with the procedure. Example:
.. code-block:: json
{
"icon": {
"class": "fas fa-cogs",
"color": "#00ff00"
}
}
"""
assert name, 'Procedure name cannot be empty'
assert actions, 'Procedure actions cannot be empty'
args = args or []
proc_def = self._all_procedures.get(name, {})
proc_args = {
'name': name,
'type': ProcedureType.DB.value,
'actions': actions,
'args': args,
'meta': (
meta or (proc_def.get('meta', {}) if isinstance(proc_def, dict) else {})
),
}
def _on_entity_saved(*_, **__):
self._all_procedures[name] = proc_args
with self._status_lock:
with self._db_session() as session:
if old_name and old_name != name:
try:
self._delete(old_name, session=session)
except AssertionError as e:
self.logger.warning(
'Error while deleting old procedure: name=%s: %s',
old_name,
e,
)
self.publish_entities(
[_ProcedureWrapper(name=name, obj=proc_args)],
callback=_on_entity_saved,
)
return self.status(publish=False)
@action
def delete(self, name: str):
"""
Delete a procedure by name.
Note that this is only possible for procedures that are stored on the
database. Procedures that are loaded from Python scripts or
configuration files should be removed from the source file.
:param name: Name of the procedure to be deleted.
"""
with self._db_session() as session:
self._delete(name, session=session)
self.status()
@action
def to_yaml(self, procedure: Union[str, dict]) -> str:
"""
Serialize a procedure to YAML.
This method is useful to export a procedure to a file.
Note that it only works with either YAML-based procedures or
database-stored procedures: Python procedures can't be converted to
YAML.
:param procedure: Procedure name or definition. If a string is passed,
then the procedure will be looked up by name in the configured
procedures. If a dictionary is passed, then it should be a valid
procedure definition with at least the ``actions`` and ``name``
keys.
:return: The serialized procedure in YAML format.
"""
if isinstance(procedure, str):
proc = self._all_procedures.get(procedure)
assert proc, f'Procedure {proc} not found'
elif isinstance(procedure, dict):
name = self._normalize_name(procedure.get('name'))
assert name, 'Procedure name cannot be empty'
actions = procedure.get('actions', [])
assert actions and isinstance(
actions, (list, tuple, set)
), 'Procedure definition should have at least the "actions" key as a list of actions'
args = [self._normalize_name(arg) for arg in procedure.get('args', [])]
proc = {
f'procedure.{name}'
+ (f'({", ".join(args)})' if args else ''): [
self._serialize_action(action) for action in actions
]
}
else:
raise AssertionError(
f'Invalid procedure definition with type {type(procedure)}'
)
return yaml.safe_dump(proc, default_flow_style=False, indent=2)
@staticmethod
def _normalize_name(name: Optional[str]) -> str:
return re.sub(r'[^\w.]+', '_', (name or '').strip(' .'))
@classmethod
def _serialize_action(cls, data: Union[Iterable, Dict]) -> Union[Dict, List, str]:
if isinstance(data, dict):
name = data.get('action', data.get('name'))
if name:
return {
'action': name,
**({'args': data['args']} if data.get('args') else {}),
}
return {
k: (
cls._serialize_action(v)
if isinstance(v, (dict, list, tuple))
else v
)
for k, v in data.items()
}
elif isinstance(data, str):
return data
else:
return [cls._serialize_action(item) for item in data if item is not None]
@contextmanager
def _db_session(self) -> Generator[Session, None, None]:
db: Optional[DbPlugin] = get_plugin(DbPlugin)
assert db, 'No database plugin configured'
with db.get_session(locked=True) as session:
assert isinstance(session, Session)
yield session
if session.is_active:
session.commit()
else:
session.rollback()
def _delete(self, name: str, session: Session):
assert name, 'Procedure name cannot be empty'
proc_row: Procedure = (
session.query(Procedure).filter(Procedure.name == name).first()
)
assert proc_row, f'Procedure {name} not found in the database'
assert proc_row.procedure_type == ProcedureType.DB.value, ( # type: ignore[attr-defined]
f'Procedure {name} is not stored in the database, '
f'it should be removed from the source file'
)
session.delete(proc_row)
self._all_procedures.pop(name, None)
self._bus.post(EntityDeleteEvent(plugin=self, entity=proc_row))
def transform_entities(
self, entities: Collection[_ProcedureWrapper], **_
) -> Collection[Procedure]:
return [
self._convert_procedure(name=proc.name, proc=proc.obj) for proc in entities
self._convert_procedure(
name=proc.name,
proc=proc if isinstance(proc, Procedure) else proc.obj,
)
for proc in entities
]
def _get_wrapped_procedures(self) -> Collection[_ProcedureWrapper]:
@ -76,23 +353,41 @@ class ProceduresPlugin(RunnablePlugin, ProcedureEntityManager):
]
def _sync_db_procedures(self):
with self._status_lock:
cur_proc_names = set(self._all_procedures.keys())
db: Optional[DbPlugin] = get_plugin(DbPlugin)
assert db, 'No database plugin configured'
with self._db_session() as session:
saved_procs = {
str(proc.name): proc for proc in session.query(Procedure).all()
}
with db.get_session(
autoflush=False, autocommit=False, expire_on_commit=False
) as session:
procs_to_remove = (
session.query(Procedure)
.filter(Procedure.name.not_in(cur_proc_names))
.all()
)
procs_to_remove = [
proc
for name, proc in saved_procs.items()
if name not in cur_proc_names
and proc.procedure_type != ProcedureType.DB.value # type: ignore[attr-defined]
]
for proc in procs_to_remove:
self.logger.info('Removing stale procedure record for %s', proc.name)
self.logger.info(
'Removing stale procedure record for %s', proc.name
)
session.delete(proc)
procs_to_add = [
proc
for proc in saved_procs.values()
if proc.procedure_type == ProcedureType.DB.value # type: ignore[attr-defined]
]
for proc in procs_to_add:
self._all_procedures[str(proc.name)] = {
'type': proc.procedure_type,
'name': proc.name,
'args': proc.args,
'actions': proc.actions,
'meta': proc.meta,
}
@staticmethod
def _serialize_procedure(
proc: Union[dict, Callable], name: Optional[str] = None

View file

@ -8,6 +8,8 @@ class ProcedureEncoder(json.JSONEncoder):
"""
def default(self, o):
from platypush.entities.procedures import ProcedureType
if callable(o):
return {
'type': 'python',
@ -21,4 +23,7 @@ class ProcedureEncoder(json.JSONEncoder):
],
}
if isinstance(o, ProcedureType):
return o.value
return super().default(o)

View file

@ -634,6 +634,7 @@ class SystemPlugin(SensorPlugin, EntityManager):
if fan.get('id') and fan.get('label')
],
*[
(
SystemBattery(
id='system:battery',
name='Battery',
@ -641,6 +642,7 @@ class SystemPlugin(SensorPlugin, EntityManager):
)
if battery
else ()
)
],
]

View file

@ -1,10 +1,12 @@
import enum
import logging
import re
from copy import deepcopy
from dataclasses import dataclass, field
from functools import wraps
from queue import LifoQueue
from typing import Optional
from typing import Any, Dict, Iterable, List, Optional
from ..common import exec_wrapper
from ..config import Config
@ -14,7 +16,7 @@ from ..message.response import Response
logger = logging.getLogger('platypush')
class Statement(enum.Enum):
class StatementType(enum.Enum):
"""
Enumerates the possible statements in a procedure.
"""
@ -22,6 +24,68 @@ class Statement(enum.Enum):
BREAK = 'break'
CONTINUE = 'continue'
RETURN = 'return'
SET = 'set'
@dataclass
class Statement:
"""
Models a statement in a procedure.
"""
type: StatementType
argument: Optional[Any] = None
@classmethod
def build(cls, statement: str):
"""
Builds a statement from a string.
"""
m = re.match(r'\s*return\s*(.*)\s*', statement, re.IGNORECASE)
if m:
return ReturnStatement(argument=m.group(1))
return cls(StatementType(statement.lower()))
def run(self, *_, **__) -> Optional[Any]:
"""
Executes the statement.
"""
@dataclass
class ReturnStatement(Statement):
"""
Models a return statement in a procedure.
"""
type: StatementType = StatementType.RETURN
def run(self, *_, **context) -> Any:
return Response(
output=Request.expand_value_from_context(
self.argument, **_update_context(context)
)
)
@dataclass
class SetStatement(Statement):
"""
Models a set variable statement in a procedure.
"""
type: StatementType = StatementType.SET
vars: dict = field(default_factory=dict)
def run(self, *_, **context):
vars = deepcopy(self.vars) # pylint: disable=redefined-builtin
for k, v in vars.items():
vars[k] = Request.expand_value_from_context(v, **context)
context.update(vars)
return Response(output=vars)
class Procedure:
@ -55,7 +119,6 @@ class Procedure:
requests,
args=None,
backend=None,
id=None, # pylint: disable=redefined-builtin
procedure_class=None,
**kwargs,
):
@ -66,11 +129,32 @@ class Procedure:
if_config = LifoQueue()
procedure_class = procedure_class or cls
key = None
kwargs.pop('id', None)
for request_config in requests:
# Check if it's a break/continue/return statement
if isinstance(request_config, str):
reqs.append(Statement(request_config))
cls._flush_if_statements(reqs, if_config)
reqs.append(Statement.build(request_config))
continue
# Check if it's a return statement with a value
if (
len(request_config.keys()) == 1
and list(request_config.keys())[0] == StatementType.RETURN.value
):
cls._flush_if_statements(reqs, if_config)
reqs.append(
ReturnStatement(argument=request_config[StatementType.RETURN.value])
)
continue
# Check if it's a variable set statement
if (len(request_config.keys()) == 1) and (
list(request_config.keys())[0] == StatementType.SET.value
):
cls._flush_if_statements(reqs, if_config)
reqs.append(SetStatement(vars=request_config[StatementType.SET.value]))
continue
# Check if this request is an if-else
@ -79,6 +163,7 @@ class Procedure:
m = re.match(r'\s*(if)\s+\${(.*)}\s*', key)
if m:
cls._flush_if_statements(reqs, if_config)
if_count += 1
if_name = f'{name}__if_{if_count}'
condition = m.group(2)
@ -91,7 +176,6 @@ class Procedure:
'condition': condition,
'else_branch': [],
'backend': backend,
'id': id,
}
)
@ -132,7 +216,6 @@ class Procedure:
_async=_async,
requests=request_config[key],
backend=backend,
id=id,
iterator_name=iterator_name,
iterable=iterable,
)
@ -156,23 +239,19 @@ class Procedure:
requests=request_config[key],
condition=condition,
backend=backend,
id=id,
)
reqs.append(loop)
continue
request_config['origin'] = Config.get('device_id')
request_config['id'] = id
if 'target' not in request_config:
request_config['target'] = request_config['origin']
request = Request.build(request_config)
reqs.append(request)
while not if_config.empty():
pending_if = if_config.get()
reqs.append(IfProcedure.build(**pending_if))
cls._flush_if_statements(reqs, if_config)
return procedure_class(
name=name,
@ -184,84 +263,101 @@ class Procedure:
)
@staticmethod
def _find_nearest_loop(stack):
for proc in stack[::-1]:
if isinstance(proc, LoopProcedure):
return proc
raise AssertionError('break/continue statement found outside of a loop')
def _flush_if_statements(requests: List, if_config: LifoQueue):
while not if_config.empty():
pending_if = if_config.get()
requests.append(IfProcedure.build(**pending_if))
# pylint: disable=too-many-branches,too-many-statements
def execute(self, n_tries=1, __stack__=None, **context):
def execute(
self,
n_tries: int = 1,
__stack__: Optional[Iterable] = None,
new_context: Optional[Dict[str, Any]] = None,
**context,
):
"""
Execute the requests in the procedure.
:param n_tries: Number of tries in case of failure before raising a RuntimeError.
"""
if not __stack__:
__stack__ = [self]
else:
__stack__.append(self)
__stack__ = (self,) if not __stack__ else (self, *__stack__)
new_context = new_context or {}
if self.args:
args = self.args.copy()
for k, v in args.items():
v = Request.expand_value_from_context(v, **context)
args[k] = v
context[k] = v
args[k] = context[k] = Request.expand_value_from_context(v, **context)
logger.info('Executing procedure %s with arguments %s', self.name, args)
else:
logger.info('Executing procedure %s', self.name)
response = Response()
token = Config.get('token')
context = _update_context(context)
locals().update(context)
# pylint: disable=too-many-nested-blocks
for request in self.requests:
if callable(request):
response = request(**context)
continue
context['_async'] = self._async
context['n_tries'] = n_tries
context['__stack__'] = __stack__
context['new_context'] = new_context
if isinstance(request, Statement):
if request == Statement.RETURN:
if isinstance(request, ReturnStatement):
response = request.run(**context)
self._should_return = True
for proc in __stack__:
proc._should_return = True # pylint: disable=protected-access
break
if request in [Statement.BREAK, Statement.CONTINUE]:
loop = self._find_nearest_loop(__stack__)
if request == Statement.BREAK:
loop._should_break = True # pylint: disable=protected-access
if isinstance(request, SetStatement):
rs: dict = request.run(**context).output # type: ignore
context.update(rs)
new_context.update(rs)
locals().update(rs)
continue
if request.type in [StatementType.BREAK, StatementType.CONTINUE]:
for proc in __stack__:
if isinstance(proc, LoopProcedure):
if request.type == StatementType.BREAK:
setattr(proc, '_should_break', True) # noqa: B010
else:
loop._should_continue = True # pylint: disable=protected-access
setattr(proc, '_should_continue', True) # noqa: B010
break
proc._should_return = True # pylint: disable=protected-access
break
should_continue = getattr(self, '_should_continue', False)
should_break = getattr(self, '_should_break', False)
if isinstance(self, LoopProcedure) and (should_continue or should_break):
if should_continue:
self._should_continue = ( # pylint: disable=attribute-defined-outside-init
False
)
if self._should_return or should_continue or should_break:
break
if token and not isinstance(request, Statement):
request.token = token
context['_async'] = self._async
context['n_tries'] = n_tries
exec_ = getattr(request, 'execute', None)
if callable(exec_):
response = exec_(__stack__=__stack__, **context)
response = exec_(**context)
context.update(context.get('new_context', {}))
if not self._async and response:
if isinstance(response.output, dict):
for k, v in response.output.items():
context[k] = v
context.update(response.output)
context['output'] = response.output
context['errors'] = response.errors
new_context.update(context)
locals().update(context)
if self._should_return:
break
@ -282,10 +378,8 @@ class LoopProcedure(Procedure):
Base class while and for/fork loops.
"""
def __init__(self, name, requests, _async=False, args=None, backend=None):
super().__init__(
name=name, _async=_async, requests=requests, args=args, backend=backend
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._should_break = False
self._should_continue = False
@ -330,6 +424,9 @@ class ForProcedure(LoopProcedure):
# pylint: disable=eval-used
def execute(self, *_, **context):
ctx = _update_context(context)
locals().update(ctx)
try:
iterable = eval(self.iterable)
assert hasattr(
@ -337,11 +434,18 @@ class ForProcedure(LoopProcedure):
), f'Object of type {type(iterable)} is not iterable: {iterable}'
except Exception as e:
logger.debug('Iterable %s expansion error: %s', self.iterable, e)
iterable = Request.expand_value_from_context(self.iterable, **context)
iterable = Request.expand_value_from_context(self.iterable, **ctx)
response = Response()
for item in iterable:
ctx[self.iterator_name] = item
response = super().execute(**ctx)
ctx.update(ctx.get('new_context', {}))
if response.output and isinstance(response.output, dict):
ctx = _update_context(ctx, **response.output)
if self._should_return:
logger.info('Returning from %s', self.name)
break
@ -356,9 +460,6 @@ class ForProcedure(LoopProcedure):
logger.info('Breaking loop %s', self.name)
break
context[self.iterator_name] = item
response = super().execute(**context)
return response
@ -395,41 +496,23 @@ class WhileProcedure(LoopProcedure):
)
self.condition = condition
@staticmethod
def _get_context(**context):
for k, v in context.items():
try:
context[k] = eval(v) # pylint: disable=eval-used
except Exception as e:
logger.debug('Evaluation error for %s=%s: %s', k, v, e)
if isinstance(v, str):
try:
context[k] = eval( # pylint: disable=eval-used
'"' + re.sub(r'(^|[^\\])"', '\1\\"', v) + '"'
)
except Exception as ee:
logger.warning(
'Could not parse value for context variable %s=%s: %s',
k,
v,
ee,
)
logger.warning('Context: %s', context)
logger.exception(e)
return context
def execute(self, *_, **context):
response = Response()
context = self._get_context(**context)
for k, v in context.items():
locals()[k] = v
ctx = _update_context(context)
locals().update(ctx)
while True:
condition_true = eval(self.condition) # pylint: disable=eval-used
if not condition_true:
break
response = super().execute(**ctx)
ctx.update(ctx.get('new_context', {}))
if response.output and isinstance(response.output, dict):
_update_context(ctx, **response.output)
locals().update(ctx)
if self._should_return:
logger.info('Returning from %s', self.name)
break
@ -444,13 +527,6 @@ class WhileProcedure(LoopProcedure):
logger.info('Breaking loop %s', self.name)
break
response = super().execute(**context)
if response.output and isinstance(response.output, dict):
new_context = self._get_context(**response.output)
for k, v in new_context.items():
locals()[k] = v
return response
@ -544,20 +620,28 @@ class IfProcedure(Procedure):
)
def execute(self, *_, **context):
for k, v in context.items():
locals()[k] = v
ctx = _update_context(context)
locals().update(ctx)
condition_true = eval(self.condition) # pylint: disable=eval-used
response = Response()
if condition_true:
response = super().execute(**context)
response = super().execute(**ctx)
elif self.else_branch:
response = self.else_branch.execute(**context)
response = self.else_branch.execute(**ctx)
return response
def _update_context(context: Optional[Dict[str, Any]] = None, **kwargs):
ctx = context or {}
ctx = {**ctx.get('context', {}), **ctx, **kwargs}
for k, v in ctx.items():
ctx[k] = Request.expand_value_from_context(v, **ctx)
return ctx
def procedure(name_or_func: Optional[str] = None, *upper_args, **upper_kwargs):
name = name_or_func if isinstance(name_or_func, str) else None