From 20a2203e7e81577a7f85f36c6799ba83887f0608 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Tue, 17 Oct 2023 15:41:26 +0200 Subject: [PATCH] [Shell plugin] Added support for async output over websockets. --- platypush/backend/http/app/mixins/__init__.py | 1 + platypush/backend/http/app/ws/cmd.py | 66 +++++++++ platypush/common/cmd_stream.py | 4 + platypush/plugins/shell/__init__.py | 126 ++++++++++++++---- 4 files changed, 174 insertions(+), 23 deletions(-) create mode 100644 platypush/backend/http/app/ws/cmd.py create mode 100644 platypush/common/cmd_stream.py diff --git a/platypush/backend/http/app/mixins/__init__.py b/platypush/backend/http/app/mixins/__init__.py index 516b1117..eec40e6b 100644 --- a/platypush/backend/http/app/mixins/__init__.py +++ b/platypush/backend/http/app/mixins/__init__.py @@ -117,6 +117,7 @@ class PubSubMixin: """ try: with self.pubsub as pubsub: + pubsub.subscribe(*self._subscriptions) for msg in pubsub.listen(): channel = msg.get('channel', b'').decode() if msg.get('type') != 'message' or not ( diff --git a/platypush/backend/http/app/ws/cmd.py b/platypush/backend/http/app/ws/cmd.py new file mode 100644 index 00000000..fa717381 --- /dev/null +++ b/platypush/backend/http/app/ws/cmd.py @@ -0,0 +1,66 @@ +from base64 import b64decode +import json +from typing import Optional + +from platypush.common.cmd_stream import redis_topic + +from . import WSRoute, logger + + +class WSCommandOutput(WSRoute): + """ + Websocket route that pushes the output of an executed command to the client + as it is generated. Mapped to ``/ws/shell``. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, subscriptions=[redis_topic], **kwargs) + self._id = None + + @classmethod + def app_name(cls) -> str: + return 'shell' + + @classmethod + def path(cls) -> str: + return f'/ws/{cls.app_name()}' + + def _parse_msg(self, msg: bytes) -> Optional[bytes]: + parsed_msg = json.loads(msg) + cmd_id = parsed_msg.get('id') + output = parsed_msg.get('output') + if output is None: # End-of-stream + raise StopIteration() + + if cmd_id != self._id: + return None + + return b64decode(output) + + def open(self, *args, **kwargs): + self._id = next(iter(self.request.arguments['id']), b'').decode() or None + super().open(*args, **kwargs) + + def run(self) -> None: + super().run() + for msg in self.listen(): + try: + output = self._parse_msg(msg.data) + if output is None: + continue + + self.send(output) + except StopIteration: + break + except Exception as e: + logger.warning('Failed to parse message: %s', e) + logger.exception(e) + continue + + self._io_loop.add_callback(self._ws_close) + + def _ws_close(self): + if not self.ws_connection: + return + + self.ws_connection.close(1000, 'Command terminated') diff --git a/platypush/common/cmd_stream.py b/platypush/common/cmd_stream.py new file mode 100644 index 00000000..efa7bdb5 --- /dev/null +++ b/platypush/common/cmd_stream.py @@ -0,0 +1,4 @@ +from platypush.config import Config + + +redis_topic = f'_platypush/{Config.get("device_id")}/shell/cmd' diff --git a/platypush/plugins/shell/__init__.py b/platypush/plugins/shell/__init__.py index dfb10018..dbc9e800 100644 --- a/platypush/plugins/shell/__init__.py +++ b/platypush/plugins/shell/__init__.py @@ -1,39 +1,119 @@ +from base64 import b64encode +import json +import threading import subprocess +from typing import Callable, Optional +from uuid import uuid1 +from platypush.common.cmd_stream import redis_topic from platypush.plugins import Plugin, action +from platypush.utils import get_redis class ShellPlugin(Plugin): """ - Plugin to run custom shell commands. + Plugin to run shell commands. """ - @action - def exec(self, cmd, background=False, ignore_errors=False): - """ - Execute a command. - - :param cmd: Command to execute - :type cmd: str - - :param background: If set to True, execute the process in the background, otherwise wait for the process termination and return its output (deafult: False). - :param ignore_errors: If set, then any errors in the command execution will be ignored. Otherwise a RuntimeError will be thrown (default value: False) - :returns: A response object where the ``output`` field will contain the command output as a string, and the ``errors`` field will contain whatever was sent to stderr. - """ - - if background: - subprocess.Popen(cmd, shell=True) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + def _exec(self, func: Callable, cmd: str, ignore_errors: bool = False): try: - return subprocess.check_output( - cmd, stderr=subprocess.STDOUT, shell=True).decode('utf-8') + return func(cmd) except subprocess.CalledProcessError as e: if ignore_errors: - self.logger.warning('Command {} failed with error: {}'.format( - cmd, e.output.decode('utf-8'))) - else: - raise RuntimeError(e.output.decode('utf-8')) + self.logger.warning( + 'Command %s failed with error: %s', cmd, e.output.decode() + ) + return None + + raise RuntimeError(e.output.decode()) from e + + def _exec_simple(self, cmd: str): + return subprocess.check_output( + cmd, stderr=subprocess.STDOUT, shell=True + ).decode() + + @staticmethod + def _send_ws_output(cmd_id: str, buf: Optional[bytes]): + get_redis().publish( + redis_topic, + json.dumps( + { + 'id': cmd_id, + 'output': (b64encode(buf).decode() if buf is not None else None), + } + ), + ) + + def _exec_ws_thread(self, cmd: str, cmd_id: str): + try: + with subprocess.Popen( + cmd, + shell=True, + stdout=subprocess.PIPE, + ) as proc: + while proc.poll() is None: + if not proc.stdout: + break + + while True: + buf = proc.stdout.read(1024) + if not buf: + break + + self._send_ws_output(cmd_id, buf) + finally: + self._send_ws_output(cmd_id, None) + + def _exec_ws(self, cmd: str): + cmd_id = str(uuid1()) + threading.Thread(target=self._exec_ws_thread, args=(cmd, cmd_id)).start() + return {'ws_path': f'/ws/shell?id={cmd_id}'} + + @action + def exec( + self, + cmd: str, + background: bool = False, + ws: bool = False, + ignore_errors: bool = False, + ): + """ + Run a command. + + :param cmd: Command to execute + :param background: If set to True, execute the process in the + background, otherwise wait for the process termination and return + its output (default: False). + :param ignore_errors: If set, then any errors in the command execution + will be ignored. Otherwise a RuntimeError will be thrown (default + value: False) + :param ws: If set to True then the output of the command will be + sent asynchronously over a websocket channel (default: False). + In this case, the method will return a response in the format: + + .. code-block:: json + + { + "ws_path": "/ws/shell?id=" + } + + Where ``ws_path`` is the websocket path where the output of the + command will be sent. The websocket will be closed when the command + terminates. + + :returns: A response object where the ``output`` field will contain the + command output as a string, and the ``errors`` field will contain + whatever was sent to stderr. + """ + if background: + subprocess.Popen(cmd, shell=True) + return None + + func = self._exec_ws if ws else self._exec_simple + return self._exec(func, cmd, ignore_errors=ignore_errors) # vim:sw=4:ts=4:et: -