A major rewrite of the `inspect` plugin.

- The `inspect` plugin and the Sphinx inspection extensions now use the
  same underlying logic.

- Moved all the common inspection logic under
  `platypush.common.reflection`.

- Faster scanning of the available integrations and components through a
  pool of threads.

- Added `doc_url` parameters.

- Migrated events and responses metadata scanning logic.

- Now expanding some custom Sphinx tag instead of returning errors when
  running outside of the Sphinx context - it includes `:class:`,
  `:meth:` and `.. schema::`.
This commit is contained in:
Fabio Manganiello 2023-10-09 01:22:04 +02:00
parent 9acd71944c
commit 53bdcb9604
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
23 changed files with 841 additions and 940 deletions

View File

@ -1,18 +1,18 @@
import importlib
import inspect
import os
import sys
from typing import Iterable, Optional
import pkgutil
from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.message.event import Event
from platypush.message.response import Response
from platypush.plugins import Plugin
from platypush.utils.manifest import Manifests
def _get_inspect_plugin():
p = get_plugin('inspect')
assert p, 'Could not load the `inspect` plugin'
return p
def get_all_plugins():
return sorted([mf.component_name for mf in Manifests.by_base_class(Plugin)])
@ -22,11 +22,35 @@ def get_all_backends():
def get_all_events():
return _get_inspect_plugin().get_all_events().output
return _get_modules(Event)
def get_all_responses():
return _get_inspect_plugin().get_all_responses().output
return _get_modules(Response)
def _get_modules(base_type: type):
ret = set()
base_dir = os.path.dirname(inspect.getfile(base_type))
package = base_type.__module__
for _, mod_name, _ in pkgutil.walk_packages([base_dir], prefix=package + '.'):
try:
module = importlib.import_module(mod_name)
except Exception:
print('Could not import module', mod_name, file=sys.stderr)
continue
for _, obj_type in inspect.getmembers(module):
if (
inspect.isclass(obj_type)
and issubclass(obj_type, base_type)
# Exclude the base_type itself
and obj_type != base_type
):
ret.add(obj_type.__module__.replace(package + '.', '', 1))
return list(ret)
def _generate_components_doc(
@ -122,7 +146,7 @@ def generate_events_doc():
_generate_components_doc(
index_name='events',
package_name='message.event',
components=sorted(event for event in get_all_events().keys() if event),
components=sorted(event for event in get_all_events() if event),
)
@ -130,9 +154,7 @@ def generate_responses_doc():
_generate_components_doc(
index_name='responses',
package_name='message.response',
components=sorted(
response for response in get_all_responses().keys() if response
),
components=sorted(response for response in get_all_responses() if response),
)

View File

@ -23,7 +23,7 @@ class MidiBackend(Backend):
"""
:param device_name: Name of the MIDI device. *N.B.* either
`device_name` or `port_number` must be set.
Use :meth:`platypush.plugins.midi.query_ports` to get the
Use :meth:`platypush.plugins.midi.MidiPlugin.query_ports` to get the
available ports indices and names
:type device_name: str

View File

@ -1,6 +1,7 @@
from ._model import Integration
from ._model import Integration, Message
__all__ = [
"Integration",
"Message",
]

View File

@ -2,6 +2,7 @@ from .action import Action
from .argument import Argument
from .constructor import Constructor
from .integration import Integration
from .message import Message
from .returns import ReturnValue
@ -10,5 +11,6 @@ __all__ = [
"Argument",
"Constructor",
"Integration",
"Message",
"ReturnValue",
]

View File

@ -0,0 +1,68 @@
from abc import ABC, abstractmethod
from typing import Dict, Type
from .argument import Argument
class Component(ABC):
"""
Abstract interface for all the application components exposed through the
`inspect` plugin.
It includes integrations (plugins and backends) and messages (events and
responses).
"""
@staticmethod
def _merge_params(params: Dict[str, Argument], new_params: Dict[str, Argument]):
"""
Utility function to merge a new mapping of parameters into an existing one.
"""
for param_name, param in new_params.items():
# Set the parameter if it doesn't exist
if param_name not in params:
params[param_name] = param
# Set the parameter documentation if it's not set
if param.doc and not params[param_name].doc:
params[param_name].doc = param.doc
# If the new parameter has required=False,
# then that should also be the value for the current ones
if param.required is False:
params[param_name].required = False
# If the new parameter has a default value, and the current
# one doesn't, then the default value should be set as the new one.
if param.default is not None and params[param_name].default is None:
params[param_name].default = param.default
@classmethod
@abstractmethod
def by_name(cls, name: str) -> "Component":
"""
:param name: Component type name.
:return: A parsed component class given its name/type name.
"""
@classmethod
@abstractmethod
def by_type(cls, type: Type) -> "Component":
"""
:param type: Component type.
:return: A parsed component class given its type.
"""
@property
@abstractmethod
def cls(self) -> Type:
"""
:return: The class of a component.
"""
@property
@abstractmethod
def doc_url(self) -> str:
"""
:return: The URL of the documentation of the component.
"""

View File

@ -0,0 +1 @@
doc_base_url = 'https://docs.platypush.tech/platypush'

View File

@ -16,18 +16,18 @@ from platypush.utils import (
from platypush.utils.manifest import Manifest, ManifestType, Dependencies
from .._serialize import Serializable
from . import Constructor, Action, Argument
from . import Constructor, Action
from .component import Component
from .constants import doc_base_url
@dataclass
class Integration(Serializable):
class Integration(Component, Serializable):
"""
Represents the metadata of an integration (plugin or backend).
"""
_class_type_re = re.compile(r"^<class '(?P<name>[\w_]+)'>$")
_doc_base_url = 'https://docs.platypush.tech/platypush'
"""Base public URL for the documentation"""
name: str
type: Type
@ -68,7 +68,7 @@ class Integration(Serializable):
"events": {
f"{e.__module__}.{e.__qualname__}": {
"doc": inspect.getdoc(e),
"doc_url": f"{self._doc_base_url}/events/"
"doc_url": f"{doc_base_url}/events/"
+ ".".join(e.__module__.split(".")[3:])
+ f".html#{e.__module__}.{e.__qualname__}",
}
@ -77,30 +77,6 @@ class Integration(Serializable):
"deps": self.deps.to_dict(),
}
@staticmethod
def _merge_params(params: Dict[str, Argument], new_params: Dict[str, Argument]):
"""
Utility function to merge a new mapping of parameters into an existing one.
"""
for param_name, param in new_params.items():
# Set the parameter if it doesn't exist
if param_name not in params:
params[param_name] = param
# Set the parameter documentation if it's not set
if param.doc and not params[param_name].doc:
params[param_name].doc = param.doc
# If the new parameter has required=False,
# then that should also be the value for the current ones
if param.required is False:
params[param_name].required = False
# If the new parameter has a default value, and the current
# one doesn't, then the default value should be set as the new one.
if param.default is not None and params[param_name].default is None:
params[param_name].default = param.default
@classmethod
def _merge_actions(cls, actions: Dict[str, Action], new_actions: Dict[str, Action]):
"""
@ -344,19 +320,13 @@ class Integration(Serializable):
:return: URL of the documentation for the integration.
"""
from platypush.backend import Backend
from platypush.message.event import Event
from platypush.message.response import Response
from platypush.plugins import Plugin
if issubclass(self.type, Plugin):
section = 'plugins'
elif issubclass(self.type, Backend):
section = 'backend'
elif issubclass(self.type, Event):
section = 'events'
elif issubclass(self.type, Response):
section = 'responses'
else:
raise AssertionError(f'Unknown integration type {self.type}')
return f"{self._doc_base_url}/{section}/{self.name}.html"
return f"{doc_base_url}/{section}/{self.name}.html"

View File

@ -0,0 +1,109 @@
import contextlib
import importlib
import inspect
from dataclasses import dataclass
from typing import Type, Optional
from .._serialize import Serializable
from . import Constructor
from .component import Component
from .constants import doc_base_url
@dataclass
class Message(Component, Serializable):
"""
Represents the metadata of a message type (event or response).
"""
name: str
type: Type
doc: Optional[str] = None
constructor: Optional[Constructor] = None
def to_dict(self) -> dict:
return {
"name": self.name,
"type": f"{self.type.__module__}.{self.type.__qualname__}",
"doc": self.doc,
"doc_url": self.doc_url,
"args": {
**(
{name: arg.to_dict() for name, arg in self.constructor.args.items()}
if self.constructor
else {}
),
},
}
@classmethod
def by_name(cls, name: str) -> "Message":
"""
:param name: Message type name.
:return: A parsed message class given its type.
"""
return cls.by_type(cls._get_cls(name))
@classmethod
def by_type(cls, type: Type) -> "Message":
"""
:param type: Message type.
:return: A parsed message class given its type.
"""
from platypush.message import Message as MessageClass
assert issubclass(type, MessageClass), f"Expected a Message class, got {type}"
obj = cls(
name=f'{type.__module__}.{type.__qualname__}',
type=type,
doc=inspect.getdoc(type),
constructor=Constructor.parse(type),
)
for p_type in inspect.getmro(type)[1:]:
# Don't go upper in the hierarchy.
if p_type == type:
break
with contextlib.suppress(AssertionError):
p_obj = cls.by_type(p_type)
# Merge constructor parameters
if obj.constructor and p_obj.constructor:
cls._merge_params(obj.constructor.args, p_obj.constructor.args)
return obj
@property
def cls(self) -> Type:
"""
:return: The class of a message.
"""
return self._get_cls(self.name)
@staticmethod
def _get_cls(name: str) -> Type:
"""
:param name: Full qualified type name, module included.
:return: The associated class.
"""
tokens = name.split(".")
module = importlib.import_module(".".join(tokens[:-1]))
return getattr(module, tokens[-1])
@property
def doc_url(self) -> str:
"""
:return: URL of the documentation for the message.
"""
from platypush.message.event import Event
from platypush.message.response import Response
if issubclass(self.type, Event):
section = 'events'
elif issubclass(self.type, Response):
section = 'responses'
else:
raise AssertionError(f'Unknown message type {self.type}')
mod_name = '.'.join(self.name.split('.')[3:-1])
return f"{doc_base_url}/{section}/{mod_name}.html#{self.name}"

View File

@ -1,7 +1,16 @@
import inspect
import textwrap as tw
from dataclasses import dataclass, field
from typing import Callable, Optional, Iterable, Tuple, Any, Type, get_type_hints
from typing import (
Any,
Callable,
Iterable,
List,
Optional,
Tuple,
Type,
get_type_hints,
)
from .._model.argument import Argument
from .._model.returns import ReturnValue
@ -22,17 +31,36 @@ class ParseContext:
parsed_params: dict[str, Argument] = field(default_factory=dict)
def __post_init__(self):
"""
Initialize the return type and parameters from the function annotations.
"""
# Initialize the return type from the annotations
annotations = getattr(self.obj, "__annotations__", {})
if annotations:
self.returns.type = annotations.get("return")
# Initialize the parameters from the signature
spec = inspect.getfullargspec(self.obj)
defaults = spec.defaults or ()
defaults = defaults + ((Any,) * (len(self.param_names) - len(defaults or ())))
self.parsed_params = {
name: Argument(
name=name,
type=self.param_types.get(name),
default=default if default is not Any else None,
required=default is Any,
)
for name, default in zip(self.param_names, defaults)
}
@property
def spec(self) -> inspect.FullArgSpec:
return inspect.getfullargspec(self.obj)
@property
def param_names(self) -> Iterable[str]:
return self.spec.args[1:]
def param_names(self) -> List[str]:
return list(self.spec.args[1:])
@property
def param_defaults(self) -> Tuple[Any]:

View File

@ -1,16 +1,17 @@
import re
import textwrap as tw
from contextlib import contextmanager
from typing import Optional, Dict, Callable, Generator, Any
from typing import Callable, Dict, Generator, Optional
from .._model.argument import Argument
from .._model.returns import ReturnValue
from .._serialize import Serializable
from .context import ParseContext
from .rst import RstExtensionsMixin
from .state import ParseState
class DocstringParser(Serializable):
class DocstringParser(Serializable, RstExtensionsMixin):
"""
Mixin for objects that can parse docstrings.
"""
@ -103,6 +104,9 @@ class DocstringParser(Serializable):
if cls._default_docstring.match(line):
return
# Expand any custom RST extensions
line = cls._expand_rst_extensions(line, ctx)
# Update the return type docstring if required
m = cls._return_doc_re.match(line)
if m or (ctx.state == ParseState.RETURN and cls._is_continuation_line(line)):
@ -112,28 +116,17 @@ class DocstringParser(Serializable):
).rstrip()
return
# Create a new parameter entry if the docstring says so
# Initialize the documentation of a parameter on :param: docstring lines
m = cls._param_doc_re.match(line)
if m:
if m and ctx.parsed_params.get(m.group("name")):
ctx.state = ParseState.PARAM
idx = len(ctx.parsed_params)
ctx.cur_param = m.group("name")
# Skip vararg/var keyword parameters
if ctx.cur_param in {ctx.spec.varkw, ctx.spec.varargs}:
return
ctx.parsed_params[ctx.cur_param] = Argument(
name=ctx.cur_param,
required=(
idx >= len(ctx.param_defaults) or ctx.param_defaults[idx] is Any
),
doc=m.group("doc"),
type=ctx.param_types.get(ctx.cur_param),
default=ctx.param_defaults[idx]
if idx < len(ctx.param_defaults) and ctx.param_defaults[idx] is not Any
else None,
)
ctx.parsed_params[ctx.cur_param].doc = m.group("doc")
return
# Update the current parameter docstring if required

View File

@ -0,0 +1,162 @@
import importlib
import logging
import re
import textwrap as tw
from .._model.constants import doc_base_url
from .context import ParseContext
# pylint: disable=too-few-public-methods
class RstExtensionsMixin:
"""
Mixin class for handling non-standard reStructuredText extensions.
"""
_rst_extensions = {
name: re.compile(regex)
for name, regex in {
"class": "(:class:`(?P<name>[^`]+)`)",
"method": "(:meth:`(?P<name>[^`]+)`)",
"function": "(:func:`(?P<name>[^`]+)`)",
"schema": r"^((?P<indent>\s*)(?P<before>.*)"
r"(\.\. schema:: (?P<name>[\w.]+)\s*"
r"(\((?P<args>.+?)\))?)(?P<after>.*))$",
}.items()
}
logger = logging.getLogger(__name__)
@classmethod
def _expand_rst_extensions(cls, docstr: str, ctx: ParseContext) -> str:
"""
Expand the known reStructuredText extensions in a docstring.
"""
for ex_name, regex in cls._rst_extensions.items():
match = regex.search(docstr)
if not match:
continue
try:
docstr = (
cls._expand_schema(docstr, match)
if ex_name == "schema"
else cls._expand_module(docstr, ex_name, match, ctx)
)
except Exception as e:
cls.logger.warning(
"Could not import module %s: %s", match.group("name"), e
)
continue
return docstr
@classmethod
def _expand_schema(cls, docstr: str, match: re.Match) -> str:
from marshmallow import missing
from marshmallow.validate import OneOf
value = match.group("name")
mod = importlib.import_module(
"platypush.schemas." + ".".join(value.split(".")[:-1])
)
obj_cls = getattr(mod, value.split(".")[-1])
schema_args = (
eval(f'dict({match.group("args")})') if match.group("args") else {}
)
obj = obj_cls(**schema_args)
schema_doc = tw.indent(
".. code-block:: python\n\n"
+ tw.indent(
("[" if obj.many else "")
+ "{\n"
+ tw.indent(
"\n".join(
(
(
"# " + field.metadata["description"] + "\n"
if field.metadata.get("description")
else ""
)
+ (
"# Possible values: "
+ str(field.validate.choices)
+ "\n"
if isinstance(field.validate, OneOf)
else ""
)
+ f'"{field_name}": '
+ (
(
'"'
+ field.metadata.get("example", field.default)
+ '"'
if isinstance(
field.metadata.get("example", field.default),
str,
)
else str(
field.metadata.get("example", field.default)
)
)
if not (
field.metadata.get("example") is None
and field.default is missing
)
else "..."
)
)
for field_name, field in obj.fields.items()
),
prefix=" ",
)
+ "\n}"
+ ("]" if obj.many else ""),
prefix=" ",
),
prefix=match.group("indent") + " ",
)
docstr = docstr.replace(
match.group(0),
match.group("before") + "\n\n" + schema_doc + "\n\n" + match.group("after"),
)
return docstr
@classmethod
def _expand_module(
cls, docstr: str, ex_name: str, match: re.Match, ctx: ParseContext
) -> str:
value = match.group("name")
if value.startswith("."):
modname = ctx.obj.__module__ # noqa
obj_name = ctx.obj.__qualname__
elif ex_name == "method":
modname = ".".join(value.split(".")[:-2])
obj_name = ".".join(value.split(".")[-2:])
else:
modname = ".".join(value.split(".")[:-1])
obj_name = value.split(".")[-1]
url_path = None
if modname.startswith("platypush.plugins"):
url_path = "plugins/" + ".".join(modname.split(".")[2:])
elif modname.startswith("platypush.backend"):
url_path = "backends/" + ".".join(modname.split(".")[2:])
elif modname.startswith("platypush.message.event"):
url_path = "events/" + ".".join(modname.split(".")[3:])
elif modname.startswith("platypush.message.response"):
url_path = "responses/" + ".".join(modname.split(".")[3:])
if url_path:
docstr = docstr.replace(
match.group(0),
f"`{obj_name} <{doc_base_url}/{url_path}.html#{modname}.{obj_name}>`_",
)
else:
docstr = docstr.replace(match.group(0), f"``{value}``")
return docstr

View File

@ -1,36 +1,24 @@
from collections import defaultdict
import importlib
import inspect
import json
import os
import pathlib
import pickle
import pkgutil
from types import ModuleType
from typing import Callable, Dict, Generator, Optional, Type, Union
from concurrent.futures import Future, ThreadPoolExecutor
from typing import List, Optional
from platypush.backend import Backend
from platypush.common.db import override_definitions
from platypush.common.reflection import Integration, Message as MessageMetadata
from platypush.config import Config
from platypush.plugins import Plugin, action
from platypush.message import Message
from platypush.message.event import Event
from platypush.message.response import Response
from platypush.utils import (
get_backend_class_by_name,
get_backend_name_by_class,
get_plugin_class_by_name,
get_plugin_name_by_class,
)
from platypush.utils.manifest import Manifests
from platypush.utils.mock import auto_mocks
from platypush.utils.manifest import Manifest, Manifests
from ._context import ComponentContext
from ._model import (
BackendModel,
EventModel,
Model,
PluginModel,
ResponseModel,
)
from ._cache import Cache
from ._serialize import ProcedureEncoder
@ -39,297 +27,211 @@ class InspectPlugin(Plugin):
This plugin can be used to inspect platypush plugins and backends
"""
_num_workers = 8
"""Number of threads to use for the inspection."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._components_cache_file = os.path.join(
Config.get('workdir'), # type: ignore
'components.cache', # type: ignore
)
self._components_context: Dict[type, ComponentContext] = defaultdict(
ComponentContext
)
self._components_cache: Dict[type, dict] = defaultdict(dict)
self._load_components_cache()
self._cache_file = os.path.join(Config.get_cachedir(), 'components.json')
self._cache = Cache()
self._load_cache()
def _load_components_cache(self):
def _load_cache(self):
"""
Loads the components cache from disk.
"""
try:
with open(self._components_cache_file, 'rb') as f:
self._components_cache = pickle.load(f)
except Exception as e:
self.logger.warning('Could not initialize the components cache: %s', e)
self.logger.info(
'The plugin will initialize the cache by scanning '
'the integrations at the next run. This may take a while'
)
def _flush_components_cache(self):
"""
Flush the current components cache to disk.
"""
with open(self._components_cache_file, 'wb') as f:
pickle.dump(self._components_cache, f)
def _get_cached_component(
self, base_type: type, comp_type: type
) -> Optional[Model]:
"""
Retrieve a cached component's ``Model``.
:param base_type: The base type of the component (e.g. ``Plugin`` or
``Backend``).
:param comp_type: The specific type of the component (e.g.
``MusicMpdPlugin`` or ``HttpBackend``).
:return: The cached component's ``Model`` if it exists, otherwise null.
"""
return self._components_cache.get(base_type, {}).get(comp_type)
def _cache_component(
self,
base_type: type,
comp_type: type,
model: Model,
index_by_module: bool = False,
):
"""
Cache the ``Model`` object for a component.
:param base_type: The base type of the component (e.g. ``Plugin`` or
``Backend``).
:param comp_type: The specific type of the component (e.g.
``MusicMpdPlugin`` or ``HttpBackend``).
:param model: The ``Model`` object to cache.
:param index_by_module: If ``True``, the ``Model`` object will be
indexed according to the ``base_type -> module -> comp_type``
mapping, otherwise ``base_type -> comp_type``.
"""
if index_by_module:
if not self._components_cache.get(base_type, {}).get(model.package):
self._components_cache[base_type][model.package] = {}
self._components_cache[base_type][model.package][comp_type] = model
else:
self._components_cache[base_type][comp_type] = model
def _scan_integrations(self, base_type: type):
"""
A generator that scans the manifest files given a ``base_type``
(``Plugin`` or ``Backend``) and yields the parsed submodules.
"""
for manifest in Manifests.by_base_class(base_type):
with self._cache.lock(), auto_mocks(), override_definitions():
try:
yield importlib.import_module(manifest.package)
self._cache = Cache.load(self._cache_file)
except Exception as e:
self.logger.debug(
'Could not import module %s: %s',
manifest.package,
self.logger.warning(
'Could not initialize the components cache from %s: %s',
self._cache_file,
e,
)
continue
self._cache = Cache()
def _scan_modules(self, base_type: type) -> Generator[ModuleType, None, None]:
self._refresh_cache()
def _refresh_cache(self):
"""
A generator that scan the modules given a ``base_type`` (e.g. ``Event``).
Refreshes the components cache.
"""
cache_version_differs = self._cache.version != Cache.cur_version
Unlike :meth:`._scan_integrations`, this method recursively scans the
modules using ``pkgutil`` instead of using the information provided in
the integrations' manifest files.
with ThreadPoolExecutor(self._num_workers) as pool:
futures = []
for base_type in [Plugin, Backend]:
futures.append(
pool.submit(
self._scan_integrations,
base_type,
pool=pool,
force_refresh=cache_version_differs,
futures=futures,
)
)
for base_type in [Event, Response]:
futures.append(
pool.submit(
self._scan_modules,
base_type,
pool=pool,
force_refresh=cache_version_differs,
futures=futures,
)
)
while futures:
futures.pop().result()
if self._cache.has_changes:
self.logger.info('Saving new components cache to %s', self._cache_file)
self._cache.dump(self._cache_file)
self._cache.loaded_at = self._cache.saved_at
def _scan_integration(self, manifest: Manifest):
"""
Scans a single integration from the manifest and adds it to the cache.
"""
try:
self._cache_integration(Integration.from_manifest(manifest.file))
except Exception as e:
self.logger.warning(
'Could not import module %s: %s',
manifest.package,
e,
)
def _scan_integrations(
self,
base_type: type,
pool: ThreadPoolExecutor,
futures: List[Future],
force_refresh: bool = False,
):
"""
Scans the integrations with a manifest file (plugins and backends) and
refreshes the cache.
"""
for manifest in Manifests.by_base_class(base_type):
# An integration metadata needs to be refreshed if it's been
# modified since it was last loaded, or if it's not in the
# cache.
if force_refresh or self._needs_refresh(manifest.file):
futures.append(pool.submit(self._scan_integration, manifest))
def _scan_module(self, base_type: type, modname: str):
"""
Scans a single module for objects that match the given base_type and
adds them to the cache.
"""
try:
module = importlib.import_module(modname)
except Exception as e:
self.logger.warning('Could not import module %s: %s', modname, e)
return
for _, obj_type in inspect.getmembers(module):
if (
inspect.isclass(obj_type)
and issubclass(obj_type, base_type)
# Exclude the base_type itself
and obj_type != base_type
):
self.logger.info(
'Scanned %s: %s',
base_type.__name__,
f'{module.__name__}.{obj_type.__name__}',
)
self._cache.set(
base_type, obj_type, MessageMetadata.by_type(obj_type).to_dict()
)
def _scan_modules(
self,
base_type: type,
pool: ThreadPoolExecutor,
futures: List[Future],
force_refresh: bool = False,
):
"""
A generator that scans the modules given a ``base_type`` (e.g. ``Event``).
It's a bit more inefficient than :meth:`._scan_integrations` because it
needs to inspect all the members of a module to find the ones that
match the given ``base_type``, but it works fine for simple components
(like messages) that don't require extra recursive parsing and don't
have a manifest.
"""
prefix = base_type.__module__ + '.'
path = str(pathlib.Path(inspect.getfile(base_type)).parent)
for _, modname, _ in pkgutil.walk_packages(
for _, modname, __ in pkgutil.walk_packages(
path=[path], prefix=prefix, onerror=lambda _: None
):
try:
yield importlib.import_module(modname)
filename = self._module_filename(path, '.'.join(modname.split('.')[3:]))
if not (force_refresh or self._needs_refresh(filename)):
continue
except Exception as e:
self.logger.debug('Could not import module %s: %s', modname, e)
self.logger.warning('Could not scan module %s: %s', modname, e)
continue
def _init_component(
self,
base_type: type,
comp_type: type,
model_type: Type[Model],
index_by_module: bool = False,
) -> Model:
futures.append(pool.submit(self._scan_module, base_type, modname))
def _needs_refresh(self, filename: str) -> bool:
"""
Initialize a component's ``Model`` object and cache it.
:param base_type: The base type of the component (e.g. ``Plugin`` or
``Backend``).
:param comp_type: The specific type of the component (e.g.
``MusicMpdPlugin`` or ``HttpBackend``).
:param model_type: The type of the ``Model`` object that should be
created.
:param index_by_module: If ``True``, the ``Model`` object will be
indexed according to the ``base_type -> module -> comp_type``
mapping, otherwise ``base_type -> comp_type``.
:return: The initialized component's ``Model`` object.
:return: True if the given file needs to be refreshed in the cache.
"""
prefix = base_type.__module__ + '.'
comp_file = inspect.getsourcefile(comp_type)
model = None
mtime = None
if comp_file:
mtime = os.stat(comp_file).st_mtime
cached_model = self._get_cached_component(base_type, comp_type)
# Only update the component model if its source file was
# modified since the last time it was scanned
if (
cached_model
and cached_model.last_modified
and mtime <= cached_model.last_modified
):
model = cached_model
if not model:
self.logger.info('Scanning component %s', comp_type.__name__)
model = model_type(comp_type, prefix=prefix, last_modified=mtime)
self._cache_component(
base_type, comp_type, model, index_by_module=index_by_module
)
return model
def _init_modules(
self,
base_type: type,
model_type: Type[Model],
):
"""
Initializes, parses and caches all the components of a given type.
Unlike :meth:`._scan_integrations`, this method inspects all the
members of a ``module`` for those that match the given ``base_type``
instead of relying on the information provided in the manifest.
It is a bit more inefficient, but it works fine for simple components
(like entities and messages) that don't require extra recursive parsing
logic for their docs (unlike plugins).
"""
for module in self._scan_modules(base_type):
for _, obj_type in inspect.getmembers(module):
if (
inspect.isclass(obj_type)
and issubclass(obj_type, base_type)
# Exclude the base_type itself
and obj_type != base_type
):
self._init_component(
base_type=base_type,
comp_type=obj_type,
model_type=model_type,
index_by_module=True,
)
def _init_integrations(
self,
base_type: Type[Union[Plugin, Backend]],
model_type: Type[Union[PluginModel, BackendModel]],
class_by_name: Callable[[str], Optional[type]],
):
"""
Initializes, parses and caches all the integrations of a given type.
:param base_type: The base type of the component (e.g. ``Plugin`` or
``Backend``).
:param model_type: The type of the ``Model`` objects that should be
created.
:param class_by_name: A function that returns the class of a given
integration given its qualified name.
"""
for module in self._scan_integrations(base_type):
comp_name = '.'.join(module.__name__.split('.')[2:])
comp_type = class_by_name(comp_name)
if not comp_type:
continue
self._init_component(
base_type=base_type,
comp_type=comp_type,
model_type=model_type,
)
self._flush_components_cache()
def _init_plugins(self):
"""
Initializes and caches all the available plugins.
"""
self._init_integrations(
base_type=Plugin,
model_type=PluginModel,
class_by_name=get_plugin_class_by_name,
return os.lstat(os.path.dirname(filename)).st_mtime > (
self._cache.saved_at or 0
)
def _init_backends(self):
@staticmethod
def _module_filename(path: str, modname: str) -> str:
"""
Initializes and caches all the available backends.
:param path: Path to the module.
:param modname: Module name.
:return: The full path to the module file.
"""
self._init_integrations(
base_type=Backend,
model_type=BackendModel,
class_by_name=get_backend_class_by_name,
)
filename = os.path.join(path, *modname.split('.')) + '.py'
def _init_events(self):
"""
Initializes and caches all the available events.
"""
self._init_modules(
base_type=Event,
model_type=EventModel,
)
if not os.path.isfile(filename):
filename = os.path.join(path, *modname.split('.'), '__init__.py')
def _init_responses(self):
"""
Initializes and caches all the available responses.
"""
self._init_modules(
base_type=Response,
model_type=ResponseModel,
)
assert os.path.isfile(filename), f'No such file or directory: {filename}'
return filename
def _init_components(self, base_type: type, initializer: Callable[[], None]):
def _cache_integration(self, integration: Integration) -> dict:
"""
Context manager boilerplate for the other ``_init_*`` methods.
:param integration: The :class:`.IntegrationMetadata` object.
:return: The initialized component's metadata dict.
"""
ctx = self._components_context[base_type]
with ctx.init_lock:
if not ctx.refreshed.is_set():
initializer()
ctx.refreshed.set()
self.logger.info(
'Scanned %s: %s', integration.base_type.__name__, integration.name
)
meta = integration.to_dict()
self._cache.set(integration.base_type, integration.type, meta)
return meta
@action
def get_all_plugins(self):
"""
Get information about all the available plugins.
"""
self._init_components(Plugin, self._init_plugins)
return json.dumps(
{
get_plugin_name_by_class(cls): dict(plugin)
for cls, plugin in self._components_cache.get(Plugin, {}).items()
},
cls=Message.Encoder,
)
return json.dumps(self._cache.to_dict().get('plugins', {}), cls=Message.Encoder)
@action
def get_all_backends(self):
"""
Get information about all the available backends.
"""
self._init_components(Backend, self._init_backends)
return json.dumps(
{
get_backend_name_by_class(cls): dict(backend)
for cls, backend in self._components_cache.get(Backend, {}).items()
}
self._cache.to_dict().get('backends', {}), cls=Message.Encoder
)
@action
@ -337,33 +239,15 @@ class InspectPlugin(Plugin):
"""
Get information about all the available events.
"""
self._init_components(Event, self._init_events)
return json.dumps(
{
package: {
obj_type.__name__: dict(event_model)
for obj_type, event_model in events.items()
}
for package, events in self._components_cache.get(Event, {}).items()
}
)
return json.dumps(self._cache.to_dict().get('events', {}), cls=Message.Encoder)
@action
def get_all_responses(self):
"""
Get information about all the available responses.
"""
self._init_components(Response, self._init_responses)
return json.dumps(
{
package: {
obj_type.__name__: dict(response_model)
for obj_type, response_model in responses.items()
}
for package, responses in self._components_cache.get(
Response, {}
).items()
}
self._cache.to_dict().get('responses', {}), cls=Message.Encoder
)
@action

View File

@ -0,0 +1,248 @@
from contextlib import contextmanager
import json
import logging
from collections import defaultdict
from time import time
from threading import RLock
from typing import Dict, Optional
from platypush.backend import Backend
from platypush.message.event import Event
from platypush.message.response import Response
from platypush.plugins import Plugin
from platypush.utils import (
get_backend_class_by_name,
get_backend_name_by_class,
get_plugin_class_by_name,
get_plugin_name_by_class,
)
logger = logging.getLogger(__name__)
class Cache:
"""
A cache for the parsed integration metadata.
Cache structure:
.. code-block:: python
{
<integration_category>: {
<integration_type>: {
'doc': <integration_docstring>,
'args': {
<arg_name>: {
'name': <arg_name>,
'type': <arg_type>,
'doc': <arg_docstring>,
'default': <arg_default_value>,
'required': <arg_required>,
},
...
},
'actions': {
<action_name>: {
'name': <action_name>,
'doc': <action_docstring>,
'args': {
...
},
'returns': {
'type': <return_type>,
'doc': <return_docstring>,
},
},
...
},
'events': [
<event_type1>,
<event_type2>,
...
],
},
...
},
...
}
"""
cur_version = 1
"""
Cache version, used to detect breaking changes in the cache logic that require a cache refresh.
"""
def __init__(
self,
items: Optional[Dict[type, Dict[type, dict]]] = None,
saved_at: Optional[float] = None,
loaded_at: Optional[float] = None,
version: int = cur_version,
):
self.saved_at = saved_at
self.loaded_at = loaded_at
self._cache: Dict[type, Dict[type, dict]] = defaultdict(dict)
self._lock = RLock()
self.version = version
self.has_changes = False
if items:
self._cache.update(items)
self.loaded_at = time()
@classmethod
def load(cls, cache_file: str) -> 'Cache':
"""
Loads the components cache from disk.
:param cache_file: Cache file path.
"""
with open(cache_file, 'r') as f:
data = json.load(f)
return cls.from_dict(data)
def dump(self, cache_file: str):
"""
Dumps the components cache to disk.
:param cache_file: Cache file path.
"""
from platypush.message import Message
self.version = self.cur_version
with open(cache_file, 'w') as f:
self.saved_at = time()
json.dump(
{
'saved_at': self.saved_at,
'version': self.version,
'items': self.to_dict(),
},
f,
cls=Message.Encoder,
)
self.has_changes = False
@classmethod
def from_dict(cls, data: dict) -> 'Cache':
"""
Creates a cache from a JSON-serializable dictionary.
"""
return cls(
items={
Backend: {
k: v
for k, v in {
get_backend_class_by_name(backend_type): backend_meta
for backend_type, backend_meta in data.get('items', {})
.get('backends', {})
.items()
}.items()
if k
},
Plugin: {
k: v
for k, v in {
get_plugin_class_by_name(plugin_type): plugin_meta
for plugin_type, plugin_meta in data.get('items', {})
.get('plugins', {})
.items()
}.items()
if k
},
Event: data.get('items', {}).get('events', {}),
Response: data.get('items', {}).get('responses', {}),
},
loaded_at=time(),
saved_at=data.get('saved_at'),
version=data.get('version', cls.cur_version),
)
def to_dict(self) -> Dict[str, Dict[str, dict]]:
"""
Converts the cache items to a JSON-serializable dictionary.
"""
return {
'backends': {
k: v
for k, v in {
get_backend_name_by_class(backend_type): backend_meta
for backend_type, backend_meta in self.backends.items()
}.items()
if k
},
'plugins': {
k: v
for k, v in {
get_plugin_name_by_class(plugin_type): plugin_meta
for plugin_type, plugin_meta in self.plugins.items()
}.items()
if k
},
'events': {
(k if isinstance(k, str) else f'{k.__module__}.{k.__qualname__}'): v
for k, v in self.events.items()
if k
},
'responses': {
(k if isinstance(k, str) else f'{k.__module__}.{k.__qualname__}'): v
for k, v in self.responses.items()
if k
},
}
def get(self, category: type, obj_type: Optional[type] = None) -> Optional[dict]:
"""
Retrieves an object from the cache.
:param category: Category type.
:param obj_type: Object type.
:return: Object metadata.
"""
collection = self._cache[category]
if not obj_type:
return collection
return collection.get(obj_type)
def set(self, category: type, obj_type: type, value: dict):
"""
Set an object on the cache.
:param category: Category type.
:param obj_type: Object type.
:param value: Value to set.
"""
self._cache[category][obj_type] = value
self.has_changes = True
@property
def plugins(self) -> Dict[type, dict]:
"""Plugins metadata."""
return self._cache[Plugin]
@property
def backends(self) -> Dict[type, dict]:
"""Backends metadata."""
return self._cache[Backend]
@property
def events(self) -> Dict[type, dict]:
"""Events metadata."""
return self._cache[Event]
@property
def responses(self) -> Dict[type, dict]:
"""Responses metadata."""
return self._cache[Response]
@contextmanager
def lock(self):
"""
Context manager that acquires a lock on the cache.
"""
with self._lock:
yield

View File

@ -1,12 +0,0 @@
from dataclasses import dataclass, field
import threading
@dataclass
class ComponentContext:
"""
This class is used to store the context of a component type.
"""
init_lock: threading.RLock = field(default_factory=threading.RLock)
refreshed: threading.Event = field(default_factory=threading.Event)

View File

@ -1,262 +0,0 @@
import inspect
import json
import re
from typing import Callable, List, Optional, Type
from platypush.backend import Backend
from platypush.message.event import Event
from platypush.message.response import Response
from platypush.plugins import Plugin
from platypush.utils import get_decorators
from ._parsers import (
BackendParser,
EventParser,
MethodParser,
Parser,
PluginParser,
ResponseParser,
SchemaParser,
)
class Model:
"""
Base class for component models.
"""
_parsers: List[Type[Parser]] = [
BackendParser,
EventParser,
MethodParser,
PluginParser,
ResponseParser,
SchemaParser,
]
_param_docstring_re = re.compile(r'^\s*:param ([^:]+):\s*(.*)')
_type_docstring_re = re.compile(r'^\s*:type ([^:]+):\s*([^\s]+).*')
_return_docstring_re = re.compile(r'^\s*:return:\s+(.*)')
def __init__(
self,
obj_type: type,
name: Optional[str] = None,
doc: Optional[str] = None,
prefix: str = '',
last_modified: Optional[float] = None,
) -> None:
"""
:param obj_type: Type of the component.
:param name: Name of the component.
:param doc: Documentation of the component.
:param last_modified: Last modified timestamp of the component.
"""
self._obj_type = obj_type
self.package = obj_type.__module__[len(prefix) :]
self.name = name or self.package
self.last_modified = last_modified
docstring = doc or ''
if obj_type.__doc__:
docstring += '\n\n' + obj_type.__doc__
if hasattr(obj_type, '__init__'):
docstring += '\n\n' + (obj_type.__init__.__doc__ or '')
self.doc, argsdoc = self._parse_docstring(docstring, obj_type=obj_type)
self.args = {}
self.has_kwargs = False
self.has_varargs = False
for arg in list(inspect.signature(obj_type).parameters.values())[1:]:
if arg.kind == arg.VAR_KEYWORD:
self.has_kwargs = True
continue
if arg.kind == arg.VAR_POSITIONAL:
self.has_varargs = True
continue
self.args[arg.name] = {
'default': (
arg.default if not issubclass(arg.default.__class__, type) else None
),
'doc': argsdoc.get(arg.name, {}).get('name'),
'required': arg.default is inspect._empty,
'type': (
argsdoc.get(arg.name, {}).get('type')
or (
(
arg.annotation.__name__
if arg.annotation.__module__ == 'builtins'
else (
None
if arg.annotation is inspect._empty
else str(arg.annotation).replace('typing.', '')
)
)
if arg.annotation
else None
)
),
}
def __str__(self):
"""
:return: JSON string representation of the model.
"""
return json.dumps(dict(self), indent=2, sort_keys=True)
def __repr__(self):
"""
:return: JSON string representation of the model.
"""
return json.dumps(dict(self))
def __iter__(self):
"""
Iterator for the model public attributes/values pairs.
"""
for attr in ['name', 'args', 'doc', 'has_varargs', 'has_kwargs']:
yield attr, getattr(self, attr)
@classmethod
def _parse_docstring(cls, docstring: str, obj_type: type):
new_docstring = ''
params = {}
cur_param = None
cur_param_docstring = ''
param_types = {}
if not docstring:
return None, {}
for line in docstring.split('\n'):
m = cls._param_docstring_re.match(line)
if m:
if cur_param:
params[cur_param] = cur_param_docstring
cur_param = m.group(1)
cur_param_docstring = m.group(2)
continue
m = cls._type_docstring_re.match(line)
if m:
if cur_param:
param_types[cur_param] = m.group(2).strip()
params[cur_param] = cur_param_docstring
cur_param = None
continue
m = cls._return_docstring_re.match(line)
if m:
if cur_param:
params[cur_param] = cur_param_docstring
new_docstring += '\n\n**Returns:**\n\n' + m.group(1).strip() + ' '
cur_param = None
continue
if cur_param:
if not line.strip():
params[cur_param] = cur_param_docstring
cur_param = None
cur_param_docstring = ''
else:
cur_param_docstring += '\n' + line.strip() + ' '
else:
new_docstring += line + '\n'
if cur_param:
params[cur_param] = cur_param_docstring
for param, doc in params.items():
params[param] = {
'name': cls._post_process_docstring(doc, obj_type=obj_type)
}
param_type = param_types.pop(param, None)
if param_type is not None:
params[param]['type'] = param_type
return cls._post_process_docstring(new_docstring, obj_type=obj_type), params
@classmethod
def _post_process_docstring(cls, docstring: str, obj_type: type) -> str:
for parsers in cls._parsers:
docstring = parsers.parse(docstring, obj_type=obj_type)
return docstring.strip()
# pylint: disable=too-few-public-methods
class BackendModel(Model):
"""
Model for backend components.
"""
def __init__(self, obj_type: Type[Backend], *args, **kwargs):
super().__init__(obj_type, *args, **kwargs)
# pylint: disable=too-few-public-methods
class PluginModel(Model):
"""
Model for plugin components.
"""
def __init__(self, obj_type: Type[Plugin], prefix: str = '', **kwargs):
super().__init__(
obj_type,
name=re.sub(r'\._plugin$', '', obj_type.__module__[len(prefix) :]),
**kwargs,
)
self.actions = {
action_name: ActionModel(getattr(obj_type, action_name))
for action_name in get_decorators(obj_type, climb_class_hierarchy=True).get(
'action', []
)
}
def __iter__(self):
"""
Overrides the default implementation of ``__iter__`` to also include
plugin actions.
"""
for attr in ['name', 'args', 'actions', 'doc', 'has_varargs', 'has_kwargs']:
if attr == 'actions':
yield attr, {
name: dict(action) for name, action in self.actions.items()
}
else:
yield attr, getattr(self, attr)
class EventModel(Model):
"""
Model for event components.
"""
def __init__(self, obj_type: Type[Event], **kwargs):
super().__init__(obj_type, **kwargs)
class ResponseModel(Model):
"""
Model for response components.
"""
def __init__(self, obj_type: Type[Response], **kwargs):
super().__init__(obj_type, **kwargs)
class ActionModel(Model):
"""
Model for plugin action components.
"""
def __init__(self, obj_type: Type[Callable], *args, **kwargs):
super().__init__(obj_type, name=obj_type.__name__, *args, **kwargs)

View File

@ -1,18 +0,0 @@
from ._backend import BackendParser
from ._base import Parser
from ._event import EventParser
from ._method import MethodParser
from ._plugin import PluginParser
from ._response import ResponseParser
from ._schema import SchemaParser
__all__ = [
'BackendParser',
'EventParser',
'MethodParser',
'Parser',
'PluginParser',
'ResponseParser',
'SchemaParser',
]

View File

@ -1,32 +0,0 @@
import re
from ._base import Parser
class BackendParser(Parser):
"""
Parse backend references in the docstrings with rendered links to their
respective documentation.
"""
_backend_regex = re.compile(
r'(\s*):class:`(platypush\.backend\.(.+?))`', re.MULTILINE
)
@classmethod
def parse(cls, docstring: str, *_, **__) -> str:
while True:
m = cls._backend_regex.search(docstring)
if not m:
break
class_name = m.group(3).split('.')[-1]
package = '.'.join(m.group(3).split('.')[:-1])
docstring = cls._backend_regex.sub(
f'{m.group(1)}`{class_name} '
f'<https://docs.platypush.tech/platypush/backend/{package}.html#{m.group(2)}>`_',
docstring,
count=1,
)
return docstring

View File

@ -1,12 +0,0 @@
from abc import ABC, abstractmethod
class Parser(ABC):
"""
Base class for parsers.
"""
@classmethod
@abstractmethod
def parse(cls, docstring: str, obj_type: type) -> str:
raise NotImplementedError()

View File

@ -1,32 +0,0 @@
import re
from ._base import Parser
class EventParser(Parser):
"""
Parse event references in the docstrings with rendered links to their
respective documentation.
"""
_event_regex = re.compile(
r'(\s*):class:`(platypush\.message\.event\.(.+?))`', re.MULTILINE
)
@classmethod
def parse(cls, docstring: str, *_, **__) -> str:
while True:
m = cls._event_regex.search(docstring)
if not m:
break
class_name = m.group(3).split('.')[-1]
package = '.'.join(m.group(3).split('.')[:-1])
docstring = cls._event_regex.sub(
f'{m.group(1)}`{class_name} '
f'<https://docs.platypush.tech/platypush/events/{package}.html#{m.group(2)}>`_',
docstring,
count=1,
)
return docstring

View File

@ -1,60 +0,0 @@
import re
from ._base import Parser
class MethodParser(Parser):
"""
Parse method references in the docstrings with rendered links to their
respective documentation.
"""
_abs_method_regex = re.compile(
r'(\s*):meth:`(platypush\.plugins\.(.+?))`', re.MULTILINE
)
_rel_method_regex = re.compile(r'(\s*):meth:`\.(.+?)`', re.MULTILINE)
@classmethod
def parse(cls, docstring: str, obj_type: type) -> str:
while True:
m = cls._rel_method_regex.search(docstring)
if m:
tokens = m.group(2).split('.')
method = tokens[-1]
package = obj_type.__module__
rel_package = '.'.join(package.split('.')[2:])
full_name = '.'.join(
[
package,
'.'.join(obj_type.__qualname__.split('.')[:-1]),
method,
]
)
docstring = cls._rel_method_regex.sub(
f'{m.group(1)}`{package}.{method} '
f'<https://docs.platypush.tech/platypush/plugins/{rel_package}.html#{full_name}>`_',
docstring,
count=1,
)
continue
m = cls._abs_method_regex.search(docstring)
if m:
tokens = m.group(3).split('.')
method = tokens[-1]
package = '.'.join(tokens[:-2])
docstring = cls._abs_method_regex.sub(
f'{m.group(1)}`{package}.{method} '
f'<https://docs.platypush.tech/platypush/plugins/{package}.html#{m.group(2)}>`_',
docstring,
count=1,
)
continue
break
return docstring

View File

@ -1,32 +0,0 @@
import re
from ._base import Parser
class PluginParser(Parser):
"""
Parse plugin references in the docstrings with rendered links to their
respective documentation.
"""
_plugin_regex = re.compile(
r'(\s*):class:`(platypush\.plugins\.(.+?))`', re.MULTILINE
)
@classmethod
def parse(cls, docstring: str, *_, **__) -> str:
while True:
m = cls._plugin_regex.search(docstring)
if not m:
break
class_name = m.group(3).split('.')[-1]
package = '.'.join(m.group(3).split('.')[:-1])
docstring = cls._plugin_regex.sub(
f'{m.group(1)}`{class_name} '
f'<https://docs.platypush.tech/platypush/plugins/{package}.html#{m.group(2)}>`_',
docstring,
count=1,
)
return docstring

View File

@ -1,32 +0,0 @@
import re
from ._base import Parser
class ResponseParser(Parser):
"""
Parse response references in the docstrings with rendered links to their
respective documentation.
"""
_response_regex = re.compile(
r'(\s*):class:`(platypush\.message\.response\.(.+?))`', re.MULTILINE
)
@classmethod
def parse(cls, docstring: str, *_, **__) -> str:
while True:
m = cls._response_regex.search(docstring)
if not m:
break
class_name = m.group(3).split('.')[-1]
package = '.'.join(m.group(3).split('.')[:-1])
docstring = cls._response_regex.sub(
f'{m.group(1)}`{class_name} '
f'<https://docs.platypush.tech/platypush/responses/{package}.html#{m.group(2)}>`_',
docstring,
count=1,
)
return docstring

View File

@ -1,95 +0,0 @@
import importlib
import inspect
import json
import os
from random import randint
import re
import textwrap
from marshmallow import fields
import platypush.schemas
from ._base import Parser
class SchemaParser(Parser):
"""
Support for response/message schemas in the docs. Format: ``.. schema:: rel_path.SchemaClass(arg1=value1, ...)``,
where ``rel_path`` is the path of the schema relative to ``platypush/schemas``.
"""
_schemas_path = os.path.dirname(inspect.getfile(platypush.schemas))
_schema_regex = re.compile(
r'^(\s*)\.\.\s+schema::\s*([a-zA-Z0-9._]+)\s*(\((.+?)\))?', re.MULTILINE
)
@classmethod
def _get_field_value(cls, field):
metadata = getattr(field, 'metadata', {})
if metadata.get('example'):
return metadata['example']
if metadata.get('description'):
return metadata['description']
if isinstance(field, fields.Number):
return randint(1, 99)
if isinstance(field, fields.Boolean):
return bool(randint(0, 1))
if isinstance(field, fields.URL):
return 'https://example.org'
if isinstance(field, fields.List):
return [cls._get_field_value(field.inner)]
if isinstance(field, fields.Dict):
return {
cls._get_field_value(field.key_field)
if field.key_field
else 'key': cls._get_field_value(field.value_field)
if field.value_field
else 'value'
}
if isinstance(field, fields.Nested):
ret = {
name: cls._get_field_value(f)
for name, f in field.nested().fields.items()
}
return [ret] if field.many else ret
return str(field.__class__.__name__).lower()
@classmethod
def parse(cls, docstring: str, *_, **__) -> str:
while True:
m = cls._schema_regex.search(docstring)
if not m:
break
schema_module_name = '.'.join(
['platypush.schemas', *(m.group(2).split('.')[:-1])]
)
schema_module = importlib.import_module(schema_module_name)
schema_class = getattr(schema_module, m.group(2).split('.')[-1])
schema_args = eval(f'dict({m.group(4)})') if m.group(4) else {}
schema = schema_class(**schema_args)
parsed_schema = {
name: cls._get_field_value(field)
for name, field in schema.fields.items()
if not field.load_only
}
if schema.many:
parsed_schema = [parsed_schema]
padding = m.group(1)
docstring = cls._schema_regex.sub(
textwrap.indent('\n\n.. code-block:: json\n\n', padding)
+ textwrap.indent(
json.dumps(parsed_schema, sort_keys=True, indent=2),
padding + ' ',
).replace('\n\n', '\n')
+ '\n\n',
docstring,
)
return docstring