From efef9d7bc09fd2d26d920e51ef88fbfc5cb3fe06 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 13 Aug 2023 23:21:36 +0200 Subject: [PATCH] Added `commands` module. --- platypush/commands/__init__.py | 0 platypush/commands/_base.py | 78 ++++++++++++++++++++++++++++++++++ platypush/commands/_reader.py | 69 ++++++++++++++++++++++++++++++ platypush/commands/_writer.py | 25 +++++++++++ 4 files changed, 172 insertions(+) create mode 100644 platypush/commands/__init__.py create mode 100644 platypush/commands/_base.py create mode 100644 platypush/commands/_reader.py create mode 100644 platypush/commands/_writer.py diff --git a/platypush/commands/__init__.py b/platypush/commands/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/platypush/commands/_base.py b/platypush/commands/_base.py new file mode 100644 index 0000000000..e029687341 --- /dev/null +++ b/platypush/commands/_base.py @@ -0,0 +1,78 @@ +from abc import ABC, abstractmethod +import json +from logging import getLogger, Logger + + +class Command(ABC): + """ + Base class for application commands. + """ + + END_OF_COMMAND = b'\x00' + """End-of-command marker.""" + + def __init__(self, **args) -> None: + self.args = args + + @property + def logger(self) -> Logger: + """ + The command class logger. + """ + return getLogger(self.__class__.__name__) + + @abstractmethod + def __call__(self, app, *_, **__): + """ + Execute the command. + """ + raise NotImplementedError() + + def __str__(self) -> str: + """ + :return: A JSON representation of the command. + """ + return json.dumps( + { + 'type': 'command', + 'command': self.__class__.__name__, + 'args': self.args, + } + ) + + def to_bytes(self): + """ + :return: A JSON representation of the command. + """ + return str(self).encode('utf-8') + self.END_OF_COMMAND + + @classmethod + def parse(cls, data: bytes) -> "Command": + """ + :param data: A JSON representation of the command. + :raise ValueError: If the data is invalid. + :return: The command instance or None if the data is invalid. + """ + import platypush.commands + + try: + json_data = json.loads(data.decode('utf-8')) + except json.JSONDecodeError as e: + raise ValueError from e + + kind = json_data.pop('type', None) + if kind != 'command': + raise ValueError(f'Invalid command type: {kind}') + + command_name = json_data.get('command') + if not command_name: + raise ValueError(f'Invalid command name: {command_name}') + + cmd_class = getattr(platypush.commands, command_name, None) + if not (cmd_class and issubclass(cmd_class, Command)): + raise ValueError(f'Invalid command class: {command_name}') + + try: + return cmd_class(**json_data.get('args', {})) + except Exception as e: + raise ValueError(e) from e diff --git a/platypush/commands/_reader.py b/platypush/commands/_reader.py new file mode 100644 index 0000000000..8eee5a3fc7 --- /dev/null +++ b/platypush/commands/_reader.py @@ -0,0 +1,69 @@ +from logging import getLogger +from socket import socket +from typing import Optional + +from platypush.commands import Command + + +# pylint: disable=too-few-public-methods +class CommandReader: + """ + Reads command objects from file-like I/O objects. + """ + + _max_bufsize = 8192 + """Maximum size of a command that can be queued in the stream.""" + + _bufsize = 1024 + """Size of the buffer used to read commands from the socket.""" + + def __init__(self): + self.logger = getLogger(__name__) + self._buf = bytes() + + def _parse_command(self, data: bytes) -> Optional[Command]: + """ + Parses a command from the received data. + + :param data: Data received from the socket + :return: The parsed command + """ + try: + return Command.parse(data) + except ValueError as e: + self.logger.warning('Error while parsing command: %s', e) + return None + + def read(self, sock: socket) -> Optional[Command]: + """ + Parses the next command from the file-like I/O object. + + :param fp: The file-like I/O object to read from. + :return: The parsed command. + """ + try: + data = sock.recv(self._bufsize) + except OSError as e: + self.logger.warning( + 'Error while reading from socket %s: %s', sock.getsockname(), e + ) + return None + + for ch in data: + if bytes([ch]) == Command.END_OF_COMMAND: + cmd = self._parse_command(self._buf) + self._buf = bytes() + + if cmd: + return cmd + elif len(self._buf) >= self._max_bufsize: + self.logger.warning( + 'The received command is too long: length=%d', len(self._buf) + ) + + self._buf = bytes() + break + else: + self._buf += bytes([ch]) + + return None diff --git a/platypush/commands/_writer.py b/platypush/commands/_writer.py new file mode 100644 index 0000000000..cf701128da --- /dev/null +++ b/platypush/commands/_writer.py @@ -0,0 +1,25 @@ +from logging import getLogger +from socket import socket + +from platypush.commands import Command + + +# pylint: disable=too-few-public-methods +class CommandWriter: + """ + Writes command objects to file-like I/O objects. + """ + + def __init__(self): + self.logger = getLogger(__name__) + + def write(self, cmd: Command, sock: socket): + """ + Writes a command to a file-like I/O object. + + :param cmd: The command to write. + :param fp: The file-like I/O object to write to. + """ + + buf = cmd.to_bytes() + sock.sendall(buf)