diff --git a/docs/source/_ext/add_dependencies.py b/docs/source/_ext/add_dependencies.py index 8000fd8d45..18c646c04d 100644 --- a/docs/source/_ext/add_dependencies.py +++ b/docs/source/_ext/add_dependencies.py @@ -1,98 +1,188 @@ +import inspect import os import re - -import yaml +import sys +import textwrap as tw +from contextlib import contextmanager from sphinx.application import Sphinx +base_path = os.path.abspath( + os.path.join(os.path.dirname(os.path.relpath(__file__)), '..', '..', '..') +) -def add_events(source: list[str], manifest: dict, idx: int) -> int: - events = manifest.get('events', []) - if not events: - return idx +sys.path.insert(0, base_path) - source.insert( - idx, - 'Triggered events\n----------------\n\n' - + '\n'.join(f'\t- :class:`{event}`' for event in events) - + '\n\n', - ) - - return idx + 1 +from platypush.utils import get_plugin_name_by_class # noqa +from platypush.utils.mock import mock # noqa +from platypush.utils.reflection import IntegrationMetadata, import_file # noqa -def add_install_deps(source: list[str], manifest: dict, idx: int) -> int: - install_deps = manifest.get('install', {}) - install_cmds = { - 'pip': 'pip install', - 'Alpine': 'apk add', - 'Arch Linux': 'pacman -S', - 'Debian': 'apt install', - 'Fedora': 'yum install', - } +class IntegrationEnricher: + @staticmethod + def add_events(source: list[str], manifest: IntegrationMetadata, idx: int) -> int: + if not manifest.events: + return idx - parsed_deps = { - 'pip': install_deps.get('pip', []), - 'Alpine': install_deps.get('apk', []), - 'Arch Linux': install_deps.get('pacman', []), - 'Debian': install_deps.get('apt', []), - 'Fedora': install_deps.get('dnf', install_deps.get('yum', [])), - } + source.insert( + idx, + 'Triggered events\n----------------\n\n' + + '\n'.join( + f'\t- :class:`{event.__module__}.{event.__qualname__}`' + for event in manifest.events + ) + + '\n\n', + ) - if not any(parsed_deps.values()): - return idx + return idx + 1 - source.insert(idx, 'Dependencies\n^^^^^^^^^^^^\n\n') - idx += 1 + @staticmethod + def add_actions(source: list[str], manifest: IntegrationMetadata, idx: int) -> int: + if not (manifest.actions and manifest.cls): + return idx + + source.insert( + idx, + 'Actions\n-------\n\n' + + '\n'.join( + f'\t- `{get_plugin_name_by_class(manifest.cls)}.{action} ' + + f'<#{manifest.cls.__module__}.{manifest.cls.__qualname__}.{action}>`_' + for action in sorted(manifest.actions.keys()) + ) + + '\n\n', + ) + + return idx + 1 + + @staticmethod + def _shellify(title: str, cmd: str) -> str: + return f'**{title}**\n\n' + '.. code-block:: bash\n\n\t' + cmd + '\n\n' + + @classmethod + def add_install_deps( + cls, source: list[str], manifest: IntegrationMetadata, idx: int + ) -> int: + deps = manifest.deps + parsed_deps = { + 'before': deps.before, + 'pip': deps.pip, + 'after': deps.after, + } + + if not (any(parsed_deps.values()) or deps.by_pkg_manager): + return idx + + source.insert(idx, 'Dependencies\n------------\n\n') + idx += 1 + + if parsed_deps['before']: + source.insert(idx, cls._shellify('Pre-install', '\n'.join(deps.before))) + idx += 1 + + if parsed_deps['pip']: + source.insert(idx, cls._shellify('pip', 'pip ' + ' '.join(deps.pip))) + idx += 1 + + for pkg_manager, sys_deps in deps.by_pkg_manager.items(): + if not sys_deps: + continue - for env, deps in parsed_deps.items(): - if deps: - install_cmd = install_cmds[env] source.insert( idx, - f'**{env}**\n\n' - + '.. code-block:: bash\n\n\t' - + f'{install_cmd} ' - + ' '.join(deps) - + '\n\n', + cls._shellify( + pkg_manager.value.default_os.value.description, + pkg_manager.value.install_doc + ' ' + ' '.join(sys_deps), + ), ) idx += 1 - return idx + if parsed_deps['after']: + source.insert(idx, cls._shellify('Post-install', '\n'.join(deps.after))) + idx += 1 + + return idx + + @classmethod + def add_description( + cls, source: list[str], manifest: IntegrationMetadata, idx: int + ) -> int: + docs = ( + doc + for doc in ( + inspect.getdoc(manifest.cls) or '', + manifest.constructor.doc if manifest.constructor else '', + ) + if doc + ) + + if not docs: + return idx + + docstring = '\n\n'.join(docs) + source.insert(idx, f"Description\n-----------\n\n{docstring}\n\n") + return idx + 1 + + @classmethod + def add_conf_snippet( + cls, source: list[str], manifest: IntegrationMetadata, idx: int + ) -> int: + source.insert( + idx, + tw.dedent( + f""" + Configuration + ------------- + + .. code-block:: yaml + +{tw.indent(manifest.config_snippet, ' ')} + """ + ), + ) + + return idx + 1 + + def __call__(self, _: Sphinx, doc: str, source: list[str]): + if not (source and re.match(r'^platypush/(backend|plugins)/.*', doc)): + return + + src = [src.split('\n') for src in source][0] + if len(src) < 3: + return + + manifest_file = os.path.join( + base_path, + *doc.split(os.sep)[:-1], + *doc.split(os.sep)[-1].split('.'), + 'manifest.yaml', + ) + + if not os.path.isfile(manifest_file): + return + + with mock_imports(): + manifest = IntegrationMetadata.from_manifest(manifest_file) + idx = self.add_description(src, manifest, idx=3) + idx = self.add_conf_snippet(src, manifest, idx=idx) + idx = self.add_install_deps(src, manifest, idx=idx) + idx = self.add_events(src, manifest, idx=idx) + idx = self.add_actions(src, manifest, idx=idx) + + src.insert(idx, '\n\nModule reference\n----------------\n\n') + source[0] = '\n'.join(src) -def parse_dependencies(_: Sphinx, doc: str, source: list[str]): - if not (source and re.match(r'^platypush/(backend|plugins)/.*', doc)): - return - - src = [src.split('\n') for src in source][0] - if len(src) < 3: - return - - base_path = os.path.abspath( - os.path.join(os.path.dirname(os.path.relpath(__file__)), '..', '..', '..') - ) - manifest_file = os.path.join( - base_path, - *doc.split(os.sep)[:-1], - *doc.split(os.sep)[-1].split('.'), - 'manifest.yaml', - ) - if not os.path.isfile(manifest_file): - return - - with open(manifest_file) as f: - manifest: dict = yaml.safe_load(f).get('manifest', {}) - - idx = add_install_deps(src, manifest, idx=3) - add_events(src, manifest, idx=idx) - source[0] = '\n'.join(src) +@contextmanager +def mock_imports(): + conf_mod = import_file(os.path.join(base_path, 'docs', 'source', 'conf.py')) + mock_mods = getattr(conf_mod, 'autodoc_mock_imports', []) + with mock(*mock_mods): + yield def setup(app: Sphinx): - app.connect('source-read', parse_dependencies) - + app.connect('source-read', IntegrationEnricher()) return { 'version': '0.1', 'parallel_read_safe': True, diff --git a/platypush/utils/__init__.py b/platypush/utils/__init__.py index 52516221ad..8fee2ba991 100644 --- a/platypush/utils/__init__.py +++ b/platypush/utils/__init__.py @@ -5,17 +5,17 @@ import hashlib import importlib import inspect import logging -from multiprocessing import Lock as PLock import os import pathlib import re import signal import socket import ssl -import urllib.request -from threading import Lock as TLock -from tempfile import gettempdir import time +import urllib.request +from multiprocessing import Lock as PLock +from tempfile import gettempdir +from threading import Lock as TLock from typing import Generator, Optional, Tuple, Type, Union from dateutil import parser, tz diff --git a/platypush/utils/manifest.py b/platypush/utils/manifest.py index 8ca220469d..8eae5d51be 100644 --- a/platypush/utils/manifest.py +++ b/platypush/utils/manifest.py @@ -28,7 +28,6 @@ from typing import ( import yaml -from platypush.message.event import Event from platypush.utils import get_src_root, is_root _available_package_manager = None @@ -52,6 +51,28 @@ class BaseImage(Enum): return self.value +@dataclass +class OSMeta: + """ + Operating system metadata. + """ + + name: str + description: str + + +class OS(Enum): + """ + Supported operating systems. + """ + + ALPINE = OSMeta('alpine', 'Alpine') + ARCH = OSMeta('arch', 'Arch Linux') + DEBIAN = OSMeta('debian', 'Debian') + FEDORA = OSMeta('fedora', 'Fedora') + UBUNTU = OSMeta('ubuntu', 'Ubuntu') + + @dataclass class PackageManager: """ @@ -60,11 +81,13 @@ class PackageManager: executable: str """ The executable name. """ - default_os: str + default_os: OS """ The default distro whose configuration we should use if this package manager is detected. """ + install_doc: str + """ The base install command that will be used in the generated documentation. """ install: Sequence[str] = field(default_factory=tuple) """ The install command, as a sequence of strings. """ uninstall: Sequence[str] = field(default_factory=tuple) @@ -79,8 +102,8 @@ class PackageManager: def _get_installed(self) -> Sequence[str]: """ - :return: The install context-aware list of installed packages. - It should only used within the context of :meth:`.get_installed`. + :return: The context-aware list of installed packages. + It should only be used within the context of :meth:`.get_installed`. """ if os.environ.get('DOCKER_CTX'): @@ -114,40 +137,57 @@ class PackageManagers(Enum): APK = PackageManager( executable='apk', + install_doc='apk add', install=('apk', 'add', '--update', '--no-interactive', '--no-cache'), uninstall=('apk', 'del', '--no-interactive'), list=('apk', 'list', '--installed'), - default_os='alpine', - parse_list_line=lambda line: re.sub(r'.*\s*\{(.+?)\}\s*.*', r'\1', line), + default_os=OS.ALPINE, + parse_list_line=lambda line: re.sub(r'.*\s*\{(.+?)}\s*.*', r'\1', line), ) APT = PackageManager( executable='apt', + install_doc='apt install', install=('apt', 'install', '-y'), uninstall=('apt', 'remove', '-y'), list=('apt', 'list', '--installed'), - default_os='debian', + default_os=OS.DEBIAN, parse_list_line=lambda line: line.split('/')[0], ) DNF = PackageManager( executable='dnf', + install_doc='yum install', install=('dnf', 'install', '-y'), uninstall=('dnf', 'remove', '-y'), list=('dnf', 'list', '--installed'), - default_os='fedora', + default_os=OS.FEDORA, parse_list_line=lambda line: re.split(r'\s+', line)[0].split('.')[0], ) PACMAN = PackageManager( executable='pacman', + install_doc='pacman -S', install=('pacman', '-S', '--noconfirm', '--needed'), uninstall=('pacman', '-R', '--noconfirm'), list=('pacman', '-Q'), - default_os='arch', + default_os=OS.ARCH, parse_list_line=lambda line: line.split(' ')[0], ) + @classmethod + def by_executable(cls, name: str) -> "PackageManagers": + """ + :param name: The name of the package manager executable to get the + package manager for. + :return: The `PackageManager` object for the given executable. + """ + pkg_manager = next(iter(pm for pm in cls if pm.value.executable == name), None) + if not pkg_manager: + raise ValueError(f'Unknown package manager: {name}') + + return pkg_manager + @classmethod def get_command(cls, name: str) -> Iterable[str]: """ @@ -230,6 +270,8 @@ class Dependencies: """ The installation context - Docker, virtual environment or bare metal. """ base_image: Optional[BaseImage] = None """ Base image used in case of Docker installations. """ + by_pkg_manager: Dict[PackageManagers, Set[str]] = field(default_factory=dict) + """ All system dependencies, grouped by package manager. """ @property def _is_venv(self) -> bool: @@ -313,7 +355,7 @@ class Dependencies: return cls._parse_requirements_file( os.path.join( - cls._get_requirements_dir(), pkg_manager.value.default_os + '.txt' + cls._get_requirements_dir(), pkg_manager.value.default_os.name + '.txt' ), install_context, ) @@ -484,15 +526,17 @@ class Manifest(ABC): deps.before += items elif key == 'after': deps.after += items - elif self._pkg_manager and key == self._pkg_manager.value.executable: - deps.packages.update(items) + else: + deps.by_pkg_manager[PackageManagers.by_executable(key)] = set(items) + if self._pkg_manager and key == self._pkg_manager.value.executable: + deps.packages.update(items) return deps @staticmethod def _init_events( events: Union[Iterable[str], Mapping[str, Optional[str]]] - ) -> Dict[Type[Event], str]: + ) -> Dict[Type, str]: evt_dict = events if isinstance(events, Mapping) else {e: None for e in events} ret = {} diff --git a/platypush/utils/mock.py b/platypush/utils/mock.py new file mode 100644 index 0000000000..11ea777e67 --- /dev/null +++ b/platypush/utils/mock.py @@ -0,0 +1,197 @@ +import os +import sys +from contextlib import contextmanager +from importlib.abc import Loader, MetaPathFinder +from importlib.machinery import ModuleSpec +from types import ModuleType +from typing import Any, Iterator, Sequence, Generator, Optional, List + + +class MockObject: + """ + Generic object that can be used to mock anything. + """ + + __display_name__ = "MockObject" + __name__ = "" + __decorator_args__: tuple[Any, ...] = () + + def __new__(cls, *args: Any, **_) -> Any: + if len(args) == 3 and isinstance(args[1], tuple): + superclass = args[1][-1].__class__ + if superclass is cls: + # subclassing MockObject + return _make_subclass( + args[0], + superclass.__display_name__, + superclass=superclass, + attributes=args[2], + ) + + return super().__new__(cls) + + def __init__(self, *_, **__) -> None: + self.__qualname__ = self.__name__ + + def __len__(self) -> int: + """ + Override __len__ so it returns zero. + """ + return 0 + + def __contains__(self, _: str) -> bool: + """ + Override __contains__ so it always returns False. + """ + return False + + def __iter__(self) -> Iterator: + """ + Override __iter__ so it always returns an empty iterator. + """ + return iter([]) + + def __mro_entries__(self, _: tuple) -> tuple: + """ + Override __mro_entries__ so it always returns a tuple containing the + class itself. + """ + return (self.__class__,) + + def __getitem__(self, key: Any) -> "MockObject": + """ + Override __getitem__ so it always returns a new MockObject. + """ + return _make_subclass(str(key), self.__display_name__, self.__class__)() + + def __getattr__(self, key: str) -> "MockObject": + """ + Override __getattr__ so it always returns a new MockObject. + """ + return _make_subclass(key, self.__display_name__, self.__class__)() + + def __call__(self, *args: Any, **_) -> Any: + """ + Override __call__ so it always returns a new MockObject. + """ + call = self.__class__() + call.__decorator_args__ = args + return call + + def __repr__(self) -> str: + """ + Override __repr__ to return the display name. + """ + return self.__display_name__ + + +def _make_subclass( + name: str, + module: str, + superclass: Any = MockObject, + attributes: Any = None, + decorator_args: tuple = (), +) -> Any: + """ + Utility method that creates a mock subclass on the fly given its + parameters. + """ + attrs = { + "__module__": module, + "__display_name__": module + "." + name, + "__name__": name, + "__decorator_args__": decorator_args, + } + + attrs.update(attributes or {}) + return type(name, (superclass,), attrs) + + +# pylint: disable=too-few-public-methods +class MockModule(ModuleType): + """ + Object that can be used to mock any module. + """ + + __file__ = os.devnull + + def __init__(self, name: str): + super().__init__(name) + self.__all__ = [] + self.__path__ = [] + + def __getattr__(self, name: str): + """ + Override __getattr__ so it always returns a new MockObject. + """ + return _make_subclass(name, self.__name__)() + + def __mro_entries__(self, _: tuple) -> tuple: + """ + Override __mro_entries__ so it always returns a tuple containing the + class itself. + """ + return (self.__class__,) + + +class MockFinder(MetaPathFinder): + """A finder for mocking.""" + + def __init__(self, modules: Sequence[str]) -> None: + super().__init__() + self.modules = modules + self.loader = MockLoader(self) + self.mocked_modules: List[str] = [] + + def find_spec( + self, + fullname: str, + path: Sequence[Optional[bytes]] | None, + target: Optional[ModuleType] = None, + ) -> ModuleSpec | None: + for modname in self.modules: + # check if fullname is (or is a descendant of) one of our targets + if modname == fullname or fullname.startswith(modname + "."): + return ModuleSpec(fullname, self.loader) + + return None + + def invalidate_caches(self) -> None: + """Invalidate mocked modules on sys.modules.""" + for modname in self.mocked_modules: + sys.modules.pop(modname, None) + + +class MockLoader(Loader): + """A loader for mocking.""" + + def __init__(self, finder: MockFinder) -> None: + super().__init__() + self.finder = finder + + def create_module(self, spec: ModuleSpec) -> ModuleType: + self.finder.mocked_modules.append(spec.name) + return MockModule(spec.name) + + def exec_module(self, module: ModuleType) -> None: + pass # nothing to do + + +@contextmanager +def mock(*modules: str) -> Generator[None, None, None]: + """ + Insert mock modules during context:: + + with mock('target.module.name'): + # mock modules are enabled here + ... + """ + finder = None + try: + finder = MockFinder(modules) + sys.meta_path.insert(0, finder) + yield + finally: + if finder: + sys.meta_path.remove(finder) + finder.invalidate_caches() diff --git a/platypush/utils/reflection/__init__.py b/platypush/utils/reflection/__init__.py new file mode 100644 index 0000000000..09c0878389 --- /dev/null +++ b/platypush/utils/reflection/__init__.py @@ -0,0 +1,318 @@ +import contextlib +import inspect +import os +import re +import textwrap as tw +from dataclasses import dataclass, field +from importlib.machinery import SourceFileLoader +from importlib.util import spec_from_loader, module_from_spec +from typing import Optional, Type, Union, Callable, Dict, Set + +from platypush.utils import ( + get_backend_class_by_name, + get_backend_name_by_class, + get_plugin_class_by_name, + get_plugin_name_by_class, + get_decorators, +) +from platypush.utils.manifest import Manifest, ManifestType, Dependencies +from platypush.utils.reflection._parser import DocstringParser, Parameter + + +class Action(DocstringParser): + """ + Represents an integration action. + """ + + +class Constructor(DocstringParser): + """ + Represents an integration constructor. + """ + + @classmethod + def parse(cls, obj: Union[Type, Callable]) -> "Constructor": + """ + Parse the parameters of a class constructor or action method. + + :param obj: Base type of the object. + :return: The parsed parameters. + """ + init = getattr(obj, "__init__", None) + if init and callable(init): + return super().parse(init) + + return super().parse(obj) + + +@dataclass +class IntegrationMetadata: + """ + Represents the metadata of an integration (plugin or backend). + """ + + _class_type_re = re.compile(r"^[\w_]+)'>$") + + name: str + type: Type + doc: Optional[str] = None + constructor: Optional[Constructor] = None + actions: Dict[str, Action] = field(default_factory=dict) + _manifest: Optional[Manifest] = None + _skip_manifest: bool = False + + def __post_init__(self): + if not self._skip_manifest: + self._init_manifest() + + @staticmethod + def _merge_params(params: Dict[str, Parameter], new_params: Dict[str, Parameter]): + """ + Utility function to merge a new mapping of parameters into an existing one. + """ + for param_name, param in new_params.items(): + # Set the parameter if it doesn't exist + if param_name not in params: + params[param_name] = param + + # Set the parameter documentation if it's not set + if param.doc and not params[param_name].doc: + params[param_name].doc = param.doc + + @classmethod + def _merge_actions(cls, actions: Dict[str, Action], new_actions: Dict[str, Action]): + """ + Utility function to merge a new mapping of actions into an existing one. + """ + for action_name, action in new_actions.items(): + # Set the action if it doesn't exist + if action_name not in actions: + actions[action_name] = action + + # Set the action documentation if it's not set + if action.doc and not actions[action_name].doc: + actions[action_name].doc = action.doc + + # Merge the parameters + cls._merge_params(actions[action_name].params, action.params) + + @classmethod + def _merge_events(cls, events: Set[Type], new_events: Set[Type]): + """ + Utility function to merge a new mapping of actions into an existing one. + """ + events.update(new_events) + + @classmethod + def by_name(cls, name: str) -> "IntegrationMetadata": + """ + :param name: Integration name. + :return: A parsed Integration class given its type. + """ + type = ( + get_backend_class_by_name(".".join(name.split(".")[1:])) + if name.startswith("backend.") + else get_plugin_class_by_name(name) + ) + return cls.by_type(type) + + @classmethod + def by_type(cls, type: Type, _skip_manifest: bool = False) -> "IntegrationMetadata": + """ + :param type: Integration type (plugin or backend). + :param _skip_manifest: Whether we should skip parsing the manifest file for this integration + (you SHOULDN'T use this flag outside of this class!). + :return: A parsed Integration class given its type. + """ + from platypush.backend import Backend + from platypush.plugins import Plugin + + assert issubclass( + type, (Plugin, Backend) + ), f"Expected a Plugin or Backend class, got {type}" + + name = ( + get_plugin_name_by_class(type) + if issubclass(type, Plugin) + else "backend." + get_backend_name_by_class(type) + ) + + assert name + obj = cls( + name=name, + type=type, + doc=inspect.getdoc(type), + constructor=Constructor.parse(type), + actions={ + name: Action.parse(getattr(type, name)) + for name in get_decorators(type, climb_class_hierarchy=True).get( + "action", [] + ) + }, + _skip_manifest=_skip_manifest, + ) + + for p_type in inspect.getmro(type)[1:]: + with contextlib.suppress(AssertionError): + p_obj = cls.by_type(p_type, _skip_manifest=True) + # Merge constructor parameters + if obj.constructor and p_obj.constructor: + cls._merge_params(obj.constructor.params, p_obj.constructor.params) + + # Merge actions + cls._merge_actions(obj.actions, p_obj.actions) + # Merge events + try: + cls._merge_events(obj.events, p_obj.events) + except FileNotFoundError: + pass + + return obj + + @property + def cls(self) -> Optional[Type]: + """ + :return: The class of an integration. + """ + manifest_type = self.manifest.package.split(".")[1] + if manifest_type == "backend": + getter = get_backend_class_by_name + elif manifest_type == "plugins": + getter = get_plugin_class_by_name + else: + return None + + return getter(".".join(self.manifest.package.split(".")[2:])) + + @classmethod + def from_manifest(cls, manifest_file: str) -> "IntegrationMetadata": + """ + Create an `IntegrationMetadata` object from a manifest file. + + :param manifest_file: Path of the manifest file. + :return: A parsed Integration class given its manifest file. + """ + manifest = Manifest.from_file(manifest_file) + name = ".".join( + [ + "backend" if manifest.manifest_type == ManifestType.BACKEND else "", + *manifest.package.split(".")[2:], + ] + ).strip(".") + + return cls.by_name(name) + + def _init_manifest(self) -> Manifest: + """ + Initialize the manifest object. + """ + if not self._manifest: + self._manifest = Manifest.from_file(self.manifest_file) + return self._manifest + + @classmethod + def _type_str(cls, param_type) -> str: + """ + Utility method to pretty-print the type string of a parameter. + """ + type_str = str(param_type).replace("typing.", "") + if m := cls._class_type_re.match(type_str): + return m.group("name") + + return type_str + + @property + def manifest(self) -> Manifest: + """ + :return: The parsed Manifest object. + """ + return self._init_manifest() + + @property + def manifest_file(self) -> str: + """ + :return: Path of the manifest file for the integration. + """ + return os.path.join( + os.path.dirname(inspect.getfile(self.type)), "manifest.yaml" + ) + + @property + def description(self) -> Optional[str]: + """ + :return: The description of the integration. + """ + return self.manifest.description + + @property + def events(self) -> Set[Type]: + """ + :return: Events triggered by the integration. + """ + return set(self.manifest.events) + + @property + def deps(self) -> Dependencies: + """ + :return: Dependencies of the integration. + """ + return self.manifest.install + + @classmethod + def _indent_yaml_comment(cls, s: str) -> str: + return tw.indent( + "\n".join( + [ + line if line.startswith("#") else f"# {line}" + for line in s.split("\n") + ] + ), + " ", + ) + + @property + def config_snippet(self) -> str: + """ + :return: A YAML snippet with the configuration parameters of the integration. + """ + return tw.dedent( + self.name + + ":\n" + + ( + "\n".join( + f' # [{"Required" if param.required else "Optional"}]\n' + + (f"{self._indent_yaml_comment(param.doc)}" if param.doc else "") + + "\n " + + ("# " if not param.required else "") + + f"{name}: " + + (str(param.default) if param.default is not None else "") + + ( + self._indent_yaml_comment(f"type={self._type_str(param.type)}") + if param.type + else "" + ) + + "\n" + for name, param in self.constructor.params.items() + ) + if self.constructor and self.constructor.params + else " # No configuration required\n" + ) + ) + + +def import_file(path: str, name: Optional[str] = None): + """ + Import a Python file as a module, even if no __init__.py is + defined in the directory. + + :param path: Path of the file to import. + :param name: Custom name for the imported module (default: same as the file's basename). + :return: The imported module. + """ + name = name or re.split(r"\.py$", os.path.basename(path))[0] + loader = SourceFileLoader(name, os.path.expanduser(path)) + mod_spec = spec_from_loader(name, loader) + assert mod_spec, f"Cannot create module specification for {path}" + mod = module_from_spec(mod_spec) + loader.exec_module(mod) + return mod diff --git a/platypush/utils/reflection/_parser.py b/platypush/utils/reflection/_parser.py new file mode 100644 index 0000000000..57f07b8acc --- /dev/null +++ b/platypush/utils/reflection/_parser.py @@ -0,0 +1,233 @@ +import inspect +import re +import textwrap as tw +from contextlib import contextmanager +from dataclasses import dataclass, field +from enum import IntEnum +from typing import ( + Any, + Optional, + Iterable, + Type, + get_type_hints, + Callable, + Tuple, + Generator, + Dict, +) + + +@dataclass +class ReturnValue: + """ + Represents the return value of an action. + """ + + doc: Optional[str] = None + type: Optional[Type] = None + + +@dataclass +class Parameter: + """ + Represents an integration constructor/action parameter. + """ + + name: str + required: bool = False + doc: Optional[str] = None + type: Optional[Type] = None + default: Optional[str] = None + + +class ParseState(IntEnum): + """ + Parse state. + """ + + DOC = 0 + PARAM = 1 + TYPE = 2 + RETURN = 3 + + +@dataclass +class ParseContext: + """ + Runtime parsing context. + """ + + obj: Callable + state: ParseState = ParseState.DOC + cur_param: Optional[str] = None + doc: Optional[str] = None + returns: ReturnValue = field(default_factory=ReturnValue) + parsed_params: dict[str, Parameter] = field(default_factory=dict) + + def __post_init__(self): + annotations = getattr(self.obj, "__annotations__", {}) + if annotations: + self.returns.type = annotations.get("return") + + @property + def spec(self) -> inspect.FullArgSpec: + return inspect.getfullargspec(self.obj) + + @property + def param_names(self) -> Iterable[str]: + return self.spec.args[1:] + + @property + def param_defaults(self) -> Tuple[Any]: + defaults = self.spec.defaults or () + return ((Any,) * (len(self.spec.args[1:]) - len(defaults))) + defaults + + @property + def param_types(self) -> dict[str, Type]: + return get_type_hints(self.obj) + + @property + def doc_lines(self) -> Iterable[str]: + return tw.dedent(inspect.getdoc(self.obj) or "").split("\n") + + +class DocstringParser: + """ + Mixin for objects that can parse docstrings. + """ + + _param_doc_re = re.compile(r"^:param\s+(?P[\w_]+):\s+(?P.*)$") + _type_doc_re = re.compile(r"^:type\s+[\w_]+:.*$") + _return_doc_re = re.compile(r"^:return:\s+(?P.*)$") + + def __init__( + self, + name: str, + doc: Optional[str] = None, + params: Optional[Dict[str, Parameter]] = None, + returns: Optional[ReturnValue] = None, + ): + self.name = name + self.doc = doc + self.params = params or {} + self.returns = returns + + @classmethod + @contextmanager + def _parser(cls, obj: Callable) -> Generator[ParseContext, None, None]: + """ + Manages the parsing context manager. + + :param obj: Method to parse. + :return: The parsing context. + """ + + def norm_indent(text: Optional[str]) -> Optional[str]: + """ + Normalize the indentation of a docstring. + + :param text: Input docstring + :return: A representation of the docstring where all the leading spaces have been removed. + """ + if not text: + return None + + lines = text.split("\n") + return (lines[0] + tw.dedent("\n".join(lines[1:]) or "")).strip() + + ctx = ParseContext(obj) + yield ctx + + # Normalize the parameters docstring indentation + for param in ctx.parsed_params.values(): + param.doc = norm_indent(param.doc) + + # Normalize the return docstring indentation + ctx.returns.doc = norm_indent(ctx.returns.doc) + + @staticmethod + def _is_continuation_line(line: str) -> bool: + return not line.strip() or line.startswith(" ") + + @classmethod + def _parse_line(cls, line: str, ctx: ParseContext): + """ + Parse a single line of the docstring and updates the parse context accordingly. + + :param line: Docstring line. + :param ctx: Parse context. + """ + # Ignore old in-doc type hints + if cls._type_doc_re.match(line) or ( + ctx.state == ParseState.TYPE and cls._is_continuation_line(line) + ): + ctx.state = ParseState.TYPE + return + + # Update the return type docstring if required + m = cls._return_doc_re.match(line) + if m or (ctx.state == ParseState.RETURN and cls._is_continuation_line(line)): + ctx.state = ParseState.RETURN + ctx.returns.doc = ((ctx.returns.doc + "\n") if ctx.returns.doc else "") + ( + m.group("doc") if m else line + ).rstrip() + return + + # Create a new parameter entry if the docstring says so + m = cls._param_doc_re.match(line) + if m: + ctx.state = ParseState.PARAM + idx = len(ctx.parsed_params) + ctx.cur_param = m.group("name") + if ctx.cur_param not in ctx.param_names: + return + + ctx.parsed_params[ctx.cur_param] = Parameter( + name=ctx.cur_param, + required=( + idx >= len(ctx.param_defaults) or ctx.param_defaults[idx] is Any + ), + doc=m.group("doc"), + type=ctx.param_types.get(ctx.cur_param), + default=ctx.param_defaults[idx] + if idx < len(ctx.param_defaults) and ctx.param_defaults[idx] is not Any + else None, + ) + return + + # Update the current parameter docstring if required + if ( + ctx.state == ParseState.PARAM + and cls._is_continuation_line(line) + and ctx.cur_param in ctx.parsed_params + ): + ctx.parsed_params[ctx.cur_param].doc = ( + ((ctx.parsed_params[ctx.cur_param].doc or "") + "\n" + line.rstrip()) + if ctx.parsed_params.get(ctx.cur_param) + and ctx.parsed_params[ctx.cur_param].doc + else "" + ) + return + + # Update the current docstring if required + ctx.cur_param = None + ctx.doc = ((ctx.doc + "\n") if ctx.doc else "") + line.rstrip() + ctx.state = ParseState.DOC + + @classmethod + def parse(cls, obj: Callable): + """ + Parse the parameters of a class constructor or action method. + :param obj: Method to parse. + :return: The parsed parameters. + """ + with cls._parser(obj) as ctx: + for line in ctx.doc_lines: + cls._parse_line(line, ctx) + + return cls( + name=obj.__name__, + doc=ctx.doc, + params=ctx.parsed_params, + returns=ctx.returns, + )