[Shell plugin] Added support for async output over websockets.

This commit is contained in:
Fabio Manganiello 2023-10-17 15:41:26 +02:00
parent 2806e943c3
commit 20a2203e7e
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
4 changed files with 174 additions and 23 deletions

View file

@ -117,6 +117,7 @@ class PubSubMixin:
"""
try:
with self.pubsub as pubsub:
pubsub.subscribe(*self._subscriptions)
for msg in pubsub.listen():
channel = msg.get('channel', b'').decode()
if msg.get('type') != 'message' or not (

View file

@ -0,0 +1,66 @@
from base64 import b64decode
import json
from typing import Optional
from platypush.common.cmd_stream import redis_topic
from . import WSRoute, logger
class WSCommandOutput(WSRoute):
"""
Websocket route that pushes the output of an executed command to the client
as it is generated. Mapped to ``/ws/shell``.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, subscriptions=[redis_topic], **kwargs)
self._id = None
@classmethod
def app_name(cls) -> str:
return 'shell'
@classmethod
def path(cls) -> str:
return f'/ws/{cls.app_name()}'
def _parse_msg(self, msg: bytes) -> Optional[bytes]:
parsed_msg = json.loads(msg)
cmd_id = parsed_msg.get('id')
output = parsed_msg.get('output')
if output is None: # End-of-stream
raise StopIteration()
if cmd_id != self._id:
return None
return b64decode(output)
def open(self, *args, **kwargs):
self._id = next(iter(self.request.arguments['id']), b'').decode() or None
super().open(*args, **kwargs)
def run(self) -> None:
super().run()
for msg in self.listen():
try:
output = self._parse_msg(msg.data)
if output is None:
continue
self.send(output)
except StopIteration:
break
except Exception as e:
logger.warning('Failed to parse message: %s', e)
logger.exception(e)
continue
self._io_loop.add_callback(self._ws_close)
def _ws_close(self):
if not self.ws_connection:
return
self.ws_connection.close(1000, 'Command terminated')

View file

@ -0,0 +1,4 @@
from platypush.config import Config
redis_topic = f'_platypush/{Config.get("device_id")}/shell/cmd'

View file

@ -1,39 +1,119 @@
from base64 import b64encode
import json
import threading
import subprocess
from typing import Callable, Optional
from uuid import uuid1
from platypush.common.cmd_stream import redis_topic
from platypush.plugins import Plugin, action
from platypush.utils import get_redis
class ShellPlugin(Plugin):
"""
Plugin to run custom shell commands.
Plugin to run shell commands.
"""
@action
def exec(self, cmd, background=False, ignore_errors=False):
"""
Execute a command.
:param cmd: Command to execute
:type cmd: str
:param background: If set to True, execute the process in the background, otherwise wait for the process termination and return its output (deafult: False).
:param ignore_errors: If set, then any errors in the command execution will be ignored. Otherwise a RuntimeError will be thrown (default value: False)
:returns: A response object where the ``output`` field will contain the command output as a string, and the ``errors`` field will contain whatever was sent to stderr.
"""
if background:
subprocess.Popen(cmd, shell=True)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def _exec(self, func: Callable, cmd: str, ignore_errors: bool = False):
try:
return subprocess.check_output(
cmd, stderr=subprocess.STDOUT, shell=True).decode('utf-8')
return func(cmd)
except subprocess.CalledProcessError as e:
if ignore_errors:
self.logger.warning('Command {} failed with error: {}'.format(
cmd, e.output.decode('utf-8')))
else:
raise RuntimeError(e.output.decode('utf-8'))
self.logger.warning(
'Command %s failed with error: %s', cmd, e.output.decode()
)
return None
raise RuntimeError(e.output.decode()) from e
def _exec_simple(self, cmd: str):
return subprocess.check_output(
cmd, stderr=subprocess.STDOUT, shell=True
).decode()
@staticmethod
def _send_ws_output(cmd_id: str, buf: Optional[bytes]):
get_redis().publish(
redis_topic,
json.dumps(
{
'id': cmd_id,
'output': (b64encode(buf).decode() if buf is not None else None),
}
),
)
def _exec_ws_thread(self, cmd: str, cmd_id: str):
try:
with subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
) as proc:
while proc.poll() is None:
if not proc.stdout:
break
while True:
buf = proc.stdout.read(1024)
if not buf:
break
self._send_ws_output(cmd_id, buf)
finally:
self._send_ws_output(cmd_id, None)
def _exec_ws(self, cmd: str):
cmd_id = str(uuid1())
threading.Thread(target=self._exec_ws_thread, args=(cmd, cmd_id)).start()
return {'ws_path': f'/ws/shell?id={cmd_id}'}
@action
def exec(
self,
cmd: str,
background: bool = False,
ws: bool = False,
ignore_errors: bool = False,
):
"""
Run a command.
:param cmd: Command to execute
:param background: If set to True, execute the process in the
background, otherwise wait for the process termination and return
its output (default: False).
:param ignore_errors: If set, then any errors in the command execution
will be ignored. Otherwise a RuntimeError will be thrown (default
value: False)
:param ws: If set to True then the output of the command will be
sent asynchronously over a websocket channel (default: False).
In this case, the method will return a response in the format:
.. code-block:: json
{
"ws_path": "/ws/shell?id=<cmd_id>"
}
Where ``ws_path`` is the websocket path where the output of the
command will be sent. The websocket will be closed when the command
terminates.
:returns: A response object where the ``output`` field will contain the
command output as a string, and the ``errors`` field will contain
whatever was sent to stderr.
"""
if background:
subprocess.Popen(cmd, shell=True)
return None
func = self._exec_ws if ws else self._exec_simple
return self._exec(func, cmd, ignore_errors=ignore_errors)
# vim:sw=4:ts=4:et: