[WIP] Large refactor of the inspection plugin and models.

This commit is contained in:
Fabio Manganiello 2023-10-04 02:27:09 +02:00
parent 841643f3ff
commit 608844ca0c
Signed by: blacklight
GPG key ID: D90FBA7F76362774
20 changed files with 483 additions and 304 deletions

View file

@ -3,7 +3,6 @@ import os
import re import re
import sys import sys
import textwrap as tw import textwrap as tw
from contextlib import contextmanager
from sphinx.application import Sphinx from sphinx.application import Sphinx
@ -13,14 +12,15 @@ base_path = os.path.abspath(
sys.path.insert(0, base_path) sys.path.insert(0, base_path)
from platypush.utils import get_plugin_name_by_class # noqa from platypush.common.reflection import Integration # noqa
from platypush.utils.mock import mock # noqa from platypush.utils import get_plugin_name_by_class, import_file # noqa
from platypush.utils.reflection import IntegrationMetadata, import_file # noqa from platypush.utils.mock import auto_mocks # noqa
from platypush.utils.mock.modules import mock_imports # noqa
class IntegrationEnricher: class IntegrationEnricher:
@staticmethod @staticmethod
def add_events(source: list[str], manifest: IntegrationMetadata, idx: int) -> int: def add_events(source: list[str], manifest: Integration, idx: int) -> int:
if not manifest.events: if not manifest.events:
return idx return idx
@ -37,7 +37,7 @@ class IntegrationEnricher:
return idx + 1 return idx + 1
@staticmethod @staticmethod
def add_actions(source: list[str], manifest: IntegrationMetadata, idx: int) -> int: def add_actions(source: list[str], manifest: Integration, idx: int) -> int:
if not (manifest.actions and manifest.cls): if not (manifest.actions and manifest.cls):
return idx return idx
@ -60,7 +60,7 @@ class IntegrationEnricher:
@classmethod @classmethod
def add_install_deps( def add_install_deps(
cls, source: list[str], manifest: IntegrationMetadata, idx: int cls, source: list[str], manifest: Integration, idx: int
) -> int: ) -> int:
deps = manifest.deps deps = manifest.deps
parsed_deps = { parsed_deps = {
@ -106,9 +106,7 @@ class IntegrationEnricher:
return idx return idx
@classmethod @classmethod
def add_description( def add_description(cls, source: list[str], manifest: Integration, idx: int) -> int:
cls, source: list[str], manifest: IntegrationMetadata, idx: int
) -> int:
docs = ( docs = (
doc doc
for doc in ( for doc in (
@ -127,7 +125,7 @@ class IntegrationEnricher:
@classmethod @classmethod
def add_conf_snippet( def add_conf_snippet(
cls, source: list[str], manifest: IntegrationMetadata, idx: int cls, source: list[str], manifest: Integration, idx: int
) -> int: ) -> int:
source.insert( source.insert(
idx, idx,
@ -163,8 +161,8 @@ class IntegrationEnricher:
if not os.path.isfile(manifest_file): if not os.path.isfile(manifest_file):
return return
with mock_imports(): with auto_mocks():
manifest = IntegrationMetadata.from_manifest(manifest_file) manifest = Integration.from_manifest(manifest_file)
idx = self.add_description(src, manifest, idx=3) idx = self.add_description(src, manifest, idx=3)
idx = self.add_conf_snippet(src, manifest, idx=idx) idx = self.add_conf_snippet(src, manifest, idx=idx)
idx = self.add_install_deps(src, manifest, idx=idx) idx = self.add_install_deps(src, manifest, idx=idx)
@ -175,14 +173,6 @@ class IntegrationEnricher:
source[0] = '\n'.join(src) source[0] = '\n'.join(src)
@contextmanager
def mock_imports():
conf_mod = import_file(os.path.join(base_path, 'docs', 'source', 'conf.py'))
mock_mods = getattr(conf_mod, 'autodoc_mock_imports', [])
with mock(*mock_mods):
yield
def setup(app: Sphinx): def setup(app: Sphinx):
app.connect('source-read', IntegrationEnricher()) app.connect('source-read', IntegrationEnricher())
return { return {

View file

@ -163,9 +163,9 @@ latex_documents = [
man_pages = [(master_doc, 'platypush', 'platypush Documentation', [author], 1)] man_pages = [(master_doc, 'platypush', 'platypush Documentation', [author], 1)]
# -- Options for Texinfo output ---------------------------------------------- # -- Options for TexInfo output ----------------------------------------------
# Grouping the document tree into Texinfo files. List of tuples # Grouping the document tree into TexInfo files. List of tuples
# (source start file, target name, title, author, # (source start file, target name, title, author,
# dir menu entry, description, category) # dir menu entry, description, category)
texinfo_documents = [ texinfo_documents = [
@ -193,126 +193,25 @@ autodoc_default_options = {
'show-inheritance': True, 'show-inheritance': True,
} }
autodoc_mock_imports = [
'gunicorn',
'googlesamples.assistant.grpc.audio_helpers',
'google.assistant.embedded',
'google.assistant.library',
'google.assistant.library.event',
'google.assistant.library.file_helpers',
'google.oauth2.credentials',
'oauth2client',
'apiclient',
'tenacity',
'smartcard',
'Leap',
'oauth2client',
'rtmidi',
'bluetooth',
'gevent.wsgi',
'Adafruit_IO',
'pyclip',
'pydbus',
'inputs',
'inotify',
'omxplayer',
'plexapi',
'cwiid',
'sounddevice',
'soundfile',
'numpy',
'cv2',
'nfc',
'ndef',
'bcrypt',
'google',
'feedparser',
'kafka',
'googlesamples',
'icalendar',
'httplib2',
'mpd',
'serial',
'pyHS100',
'grpc',
'envirophat',
'gps',
'picamera',
'pmw3901',
'PIL',
'croniter',
'pyaudio',
'avs',
'PyOBEX',
'PyOBEX.client',
'todoist',
'trello',
'telegram',
'telegram.ext',
'pyfirmata2',
'cups',
'graphyte',
'cpuinfo',
'psutil',
'openzwave',
'deepspeech',
'wave',
'pvporcupine ',
'pvcheetah',
'pyotp',
'linode_api4',
'pyzbar',
'tensorflow',
'keras',
'pandas',
'samsungtvws',
'paramiko',
'luma',
'zeroconf',
'dbus',
'gi',
'gi.repository',
'twilio',
'Adafruit_Python_DHT',
'RPi.GPIO',
'RPLCD',
'imapclient',
'pysmartthings',
'aiohttp',
'watchdog',
'pyngrok',
'irc',
'irc.bot',
'irc.strings',
'irc.client',
'irc.connection',
'irc.events',
'defusedxml',
'nio',
'aiofiles',
'aiofiles.os',
'async_lru',
'bleak',
'bluetooth_numbers',
'TheengsDecoder',
'simple_websocket',
'uvicorn',
'websockets',
'docutils',
'aioxmpp',
]
sys.path.insert(0, os.path.abspath('../..')) sys.path.insert(0, os.path.abspath('../..'))
from platypush.utils.mock.modules import mock_imports # noqa
def skip(app, what, name, obj, skip, options): autodoc_mock_imports = [*mock_imports]
# _ = app
# __ = what
# ___ = obj
# ____ = options
def _skip(_, __, name, ___, skip, ____):
if name == "__init__": if name == "__init__":
return False return False
return skip return skip
def setup(app): def setup(app):
app.connect("autodoc-skip-member", skip) app.connect("autodoc-skip-member", _skip)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -32,6 +32,14 @@ def parse_cmdline(args: Sequence[str]) -> argparse.Namespace:
help='Custom working directory to be used for the application', help='Custom working directory to be used for the application',
) )
parser.add_argument(
'--cachedir',
dest='cachedir',
required=False,
default=None,
help='Custom cache directory',
)
parser.add_argument( parser.add_argument(
'--device-id', '--device-id',
'-d', '-d',

View file

@ -0,0 +1,6 @@
from ._model import Integration
__all__ = [
"Integration",
]

View file

@ -0,0 +1,14 @@
from .action import Action
from .argument import Argument
from .constructor import Constructor
from .integration import Integration
from .returns import ReturnValue
__all__ = [
"Action",
"Argument",
"Constructor",
"Integration",
"ReturnValue",
]

View file

@ -0,0 +1,7 @@
from .._parser import DocstringParser
class Action(DocstringParser):
"""
Represents an integration action.
"""

View file

@ -0,0 +1,27 @@
from dataclasses import dataclass
from typing import Optional, Type
from .._serialize import Serializable
from .._utils import type_str
@dataclass
class Argument(Serializable):
"""
Represents an integration constructor/action parameter.
"""
name: str
required: bool = False
doc: Optional[str] = None
type: Optional[Type] = None
default: Optional[str] = None
def to_dict(self) -> dict:
return {
"name": self.name,
"required": self.required,
"doc": self.doc,
"type": type_str(self.type),
"default": self.default,
}

View file

@ -0,0 +1,23 @@
from typing import Union, Type, Callable
from .._parser import DocstringParser
class Constructor(DocstringParser):
"""
Represents an integration constructor.
"""
@classmethod
def parse(cls, obj: Union[Type, Callable]) -> "Constructor":
"""
Parse the parameters of a class constructor or action method.
:param obj: Base type of the object.
:return: The parsed parameters.
"""
init = getattr(obj, "__init__", None)
if init and callable(init):
return super().parse(init)
return super().parse(obj)

View file

@ -4,49 +4,23 @@ import os
import re import re
import textwrap as tw import textwrap as tw
from dataclasses import dataclass, field from dataclasses import dataclass, field
from importlib.machinery import SourceFileLoader from typing import Type, Optional, Dict, Set
from importlib.util import spec_from_loader, module_from_spec
from typing import Optional, Type, Union, Callable, Dict, Set
from platypush.utils import ( from platypush.utils import (
get_backend_class_by_name, get_backend_class_by_name,
get_backend_name_by_class,
get_plugin_class_by_name, get_plugin_class_by_name,
get_plugin_name_by_class, get_plugin_name_by_class,
get_backend_name_by_class,
get_decorators, get_decorators,
) )
from platypush.utils.manifest import Manifest, ManifestType, Dependencies from platypush.utils.manifest import Manifest, ManifestType, Dependencies
from platypush.utils.reflection._parser import DocstringParser, Parameter
from .._serialize import Serializable
class Action(DocstringParser): from . import Constructor, Action, Argument
"""
Represents an integration action.
"""
class Constructor(DocstringParser):
"""
Represents an integration constructor.
"""
@classmethod
def parse(cls, obj: Union[Type, Callable]) -> "Constructor":
"""
Parse the parameters of a class constructor or action method.
:param obj: Base type of the object.
:return: The parsed parameters.
"""
init = getattr(obj, "__init__", None)
if init and callable(init):
return super().parse(init)
return super().parse(obj)
@dataclass @dataclass
class IntegrationMetadata: class Integration(Serializable):
""" """
Represents the metadata of an integration (plugin or backend). Represents the metadata of an integration (plugin or backend).
""" """
@ -65,8 +39,25 @@ class IntegrationMetadata:
if not self._skip_manifest: if not self._skip_manifest:
self._init_manifest() self._init_manifest()
def to_dict(self) -> dict:
return {
"name": self.name,
"type": f'{self.type.__module__}.{self.type.__qualname__}',
"doc": self.doc,
"args": {
**(
{name: arg.to_dict() for name, arg in self.constructor.args.items()}
if self.constructor
else {}
),
},
"actions": {k: v.to_dict() for k, v in self.actions.items()},
"events": [f'{e.__module__}.{e.__qualname__}' for e in self.events],
"deps": self.deps.to_dict(),
}
@staticmethod @staticmethod
def _merge_params(params: Dict[str, Parameter], new_params: Dict[str, Parameter]): def _merge_params(params: Dict[str, Argument], new_params: Dict[str, Argument]):
""" """
Utility function to merge a new mapping of parameters into an existing one. Utility function to merge a new mapping of parameters into an existing one.
""" """
@ -104,7 +95,7 @@ class IntegrationMetadata:
actions[action_name].doc = action.doc actions[action_name].doc = action.doc
# Merge the parameters # Merge the parameters
cls._merge_params(actions[action_name].params, action.params) cls._merge_params(actions[action_name].args, action.args)
@classmethod @classmethod
def _merge_events(cls, events: Set[Type], new_events: Set[Type]): def _merge_events(cls, events: Set[Type], new_events: Set[Type]):
@ -114,7 +105,7 @@ class IntegrationMetadata:
events.update(new_events) events.update(new_events)
@classmethod @classmethod
def by_name(cls, name: str) -> "IntegrationMetadata": def by_name(cls, name: str) -> "Integration":
""" """
:param name: Integration name. :param name: Integration name.
:return: A parsed Integration class given its type. :return: A parsed Integration class given its type.
@ -127,7 +118,7 @@ class IntegrationMetadata:
return cls.by_type(type) return cls.by_type(type)
@classmethod @classmethod
def by_type(cls, type: Type, _skip_manifest: bool = False) -> "IntegrationMetadata": def by_type(cls, type: Type, _skip_manifest: bool = False) -> "Integration":
""" """
:param type: Integration type (plugin or backend). :param type: Integration type (plugin or backend).
:param _skip_manifest: Whether we should skip parsing the manifest file for this integration :param _skip_manifest: Whether we should skip parsing the manifest file for this integration
@ -167,7 +158,7 @@ class IntegrationMetadata:
p_obj = cls.by_type(p_type, _skip_manifest=True) p_obj = cls.by_type(p_type, _skip_manifest=True)
# Merge constructor parameters # Merge constructor parameters
if obj.constructor and p_obj.constructor: if obj.constructor and p_obj.constructor:
cls._merge_params(obj.constructor.params, p_obj.constructor.params) cls._merge_params(obj.constructor.args, p_obj.constructor.args)
# Merge actions # Merge actions
cls._merge_actions(obj.actions, p_obj.actions) cls._merge_actions(obj.actions, p_obj.actions)
@ -194,8 +185,24 @@ class IntegrationMetadata:
return getter(".".join(self.manifest.package.split(".")[2:])) return getter(".".join(self.manifest.package.split(".")[2:]))
@property
def base_type(self) -> Type:
"""
:return: The base type of this integration, either :class:`platypush.backend.Backend` or
:class:`platypush.plugins.Plugin`.
"""
from platypush.backend import Backend
from platypush.plugins import Plugin
if issubclass(self.cls, Plugin):
return Plugin
if issubclass(self.cls, Backend):
return Backend
raise RuntimeError(f"Unknown base type for {self.cls}")
@classmethod @classmethod
def from_manifest(cls, manifest_file: str) -> "IntegrationMetadata": def from_manifest(cls, manifest_file: str) -> "Integration":
""" """
Create an `IntegrationMetadata` object from a manifest file. Create an `IntegrationMetadata` object from a manifest file.
@ -302,27 +309,9 @@ class IntegrationMetadata:
else "" else ""
) )
+ "\n" + "\n"
for name, param in self.constructor.params.items() for name, param in self.constructor.args.items()
) )
if self.constructor and self.constructor.params if self.constructor and self.constructor.args
else " # No configuration required\n" else " # No configuration required\n"
) )
) )
def import_file(path: str, name: Optional[str] = None):
"""
Import a Python file as a module, even if no __init__.py is
defined in the directory.
:param path: Path of the file to import.
:param name: Custom name for the imported module (default: same as the file's basename).
:return: The imported module.
"""
name = name or re.split(r"\.py$", os.path.basename(path))[0]
loader = SourceFileLoader(name, os.path.expanduser(path))
mod_spec = spec_from_loader(name, loader)
assert mod_spec, f"Cannot create module specification for {path}"
mod = module_from_spec(mod_spec)
loader.exec_module(mod)
return mod

View file

@ -0,0 +1,21 @@
from dataclasses import dataclass
from typing import Optional, Type
from .._serialize import Serializable
from .._utils import type_str
@dataclass
class ReturnValue(Serializable):
"""
Represents the return value of an action.
"""
doc: Optional[str] = None
type: Optional[Type] = None
def to_dict(self) -> dict:
return {
"doc": self.doc,
"type": type_str(self.type),
}

View file

@ -0,0 +1,6 @@
from .docstring import DocstringParser
__all__ = [
"DocstringParser",
]

View file

@ -0,0 +1,48 @@
import inspect
import textwrap as tw
from dataclasses import dataclass, field
from typing import Callable, Optional, Iterable, Tuple, Any, Type, get_type_hints
from .._model.argument import Argument
from .._model.returns import ReturnValue
from .state import ParseState
@dataclass
class ParseContext:
"""
Runtime parsing context.
"""
obj: Callable
state: ParseState = ParseState.DOC
cur_param: Optional[str] = None
doc: Optional[str] = None
returns: ReturnValue = field(default_factory=ReturnValue)
parsed_params: dict[str, Argument] = field(default_factory=dict)
def __post_init__(self):
annotations = getattr(self.obj, "__annotations__", {})
if annotations:
self.returns.type = annotations.get("return")
@property
def spec(self) -> inspect.FullArgSpec:
return inspect.getfullargspec(self.obj)
@property
def param_names(self) -> Iterable[str]:
return self.spec.args[1:]
@property
def param_defaults(self) -> Tuple[Any]:
defaults = self.spec.defaults or ()
return ((Any,) * (len(self.spec.args[1:]) - len(defaults))) + defaults
@property
def param_types(self) -> dict[str, Type]:
return get_type_hints(self.obj)
@property
def doc_lines(self) -> Iterable[str]:
return tw.dedent(inspect.getdoc(self.obj) or "").split("\n")

View file

@ -1,97 +1,16 @@
import inspect
import re import re
import textwrap as tw import textwrap as tw
from contextlib import contextmanager from contextlib import contextmanager
from dataclasses import dataclass, field from typing import Optional, Dict, Callable, Generator, Any
from enum import IntEnum
from typing import ( from .._model.argument import Argument
Any, from .._model.returns import ReturnValue
Optional, from .._serialize import Serializable
Iterable, from .context import ParseContext
Type, from .state import ParseState
get_type_hints,
Callable,
Tuple,
Generator,
Dict,
)
@dataclass class DocstringParser(Serializable):
class ReturnValue:
"""
Represents the return value of an action.
"""
doc: Optional[str] = None
type: Optional[Type] = None
@dataclass
class Parameter:
"""
Represents an integration constructor/action parameter.
"""
name: str
required: bool = False
doc: Optional[str] = None
type: Optional[Type] = None
default: Optional[str] = None
class ParseState(IntEnum):
"""
Parse state.
"""
DOC = 0
PARAM = 1
TYPE = 2
RETURN = 3
@dataclass
class ParseContext:
"""
Runtime parsing context.
"""
obj: Callable
state: ParseState = ParseState.DOC
cur_param: Optional[str] = None
doc: Optional[str] = None
returns: ReturnValue = field(default_factory=ReturnValue)
parsed_params: dict[str, Parameter] = field(default_factory=dict)
def __post_init__(self):
annotations = getattr(self.obj, "__annotations__", {})
if annotations:
self.returns.type = annotations.get("return")
@property
def spec(self) -> inspect.FullArgSpec:
return inspect.getfullargspec(self.obj)
@property
def param_names(self) -> Iterable[str]:
return self.spec.args[1:]
@property
def param_defaults(self) -> Tuple[Any]:
defaults = self.spec.defaults or ()
return ((Any,) * (len(self.spec.args[1:]) - len(defaults))) + defaults
@property
def param_types(self) -> dict[str, Type]:
return get_type_hints(self.obj)
@property
def doc_lines(self) -> Iterable[str]:
return tw.dedent(inspect.getdoc(self.obj) or "").split("\n")
class DocstringParser:
""" """
Mixin for objects that can parse docstrings. Mixin for objects that can parse docstrings.
""" """
@ -105,25 +24,30 @@ class DocstringParser:
self, self,
name: str, name: str,
doc: Optional[str] = None, doc: Optional[str] = None,
params: Optional[Dict[str, Parameter]] = None, args: Optional[Dict[str, Argument]] = None,
has_varargs: bool = False,
has_kwargs: bool = False,
returns: Optional[ReturnValue] = None, returns: Optional[ReturnValue] = None,
): ):
self.name = name self.name = name
self.doc = doc self.doc = doc
self.params = params or {} self.args = args or {}
self.has_varargs = has_varargs
self.has_kwargs = has_kwargs
self.returns = returns self.returns = returns
@classmethod def to_dict(self) -> dict:
@contextmanager return {
def _parser(cls, obj: Callable) -> Generator[ParseContext, None, None]: "name": self.name,
""" "doc": self.doc,
Manages the parsing context manager. "args": {k: v.to_dict() for k, v in self.args.items()},
"has_varargs": self.has_varargs,
"has_kwargs": self.has_kwargs,
"returns": self.returns.to_dict() if self.returns else None,
}
:param obj: Method to parse. @staticmethod
:return: The parsing context. def _norm_indent(text: Optional[str]) -> Optional[str]:
"""
def norm_indent(text: Optional[str]) -> Optional[str]:
""" """
Normalize the indentation of a docstring. Normalize the indentation of a docstring.
@ -136,15 +60,25 @@ class DocstringParser:
lines = text.split("\n") lines = text.split("\n")
return (lines[0] + "\n" + tw.dedent("\n".join(lines[1:]) or "")).strip() return (lines[0] + "\n" + tw.dedent("\n".join(lines[1:]) or "")).strip()
@classmethod
@contextmanager
def _parser(cls, obj: Callable) -> Generator[ParseContext, None, None]:
"""
Manages the parsing context manager.
:param obj: Method to parse.
:return: The parsing context.
"""
ctx = ParseContext(obj) ctx = ParseContext(obj)
yield ctx yield ctx
# Normalize the parameters docstring indentation # Normalize the parameters docstring indentation
for param in ctx.parsed_params.values(): for param in ctx.parsed_params.values():
param.doc = norm_indent(param.doc) param.doc = cls._norm_indent(param.doc)
# Normalize the return docstring indentation # Normalize the return docstring indentation
ctx.returns.doc = norm_indent(ctx.returns.doc) ctx.returns.doc = cls._norm_indent(ctx.returns.doc)
@staticmethod @staticmethod
def _is_continuation_line(line: str) -> bool: def _is_continuation_line(line: str) -> bool:
@ -189,7 +123,7 @@ class DocstringParser:
if ctx.cur_param in {ctx.spec.varkw, ctx.spec.varargs}: if ctx.cur_param in {ctx.spec.varkw, ctx.spec.varargs}:
return return
ctx.parsed_params[ctx.cur_param] = Parameter( ctx.parsed_params[ctx.cur_param] = Argument(
name=ctx.cur_param, name=ctx.cur_param,
required=( required=(
idx >= len(ctx.param_defaults) or ctx.param_defaults[idx] is Any idx >= len(ctx.param_defaults) or ctx.param_defaults[idx] is Any
@ -236,6 +170,8 @@ class DocstringParser:
return cls( return cls(
name=obj.__name__, name=obj.__name__,
doc=ctx.doc, doc=ctx.doc,
params=ctx.parsed_params, args=ctx.parsed_params,
has_varargs=ctx.spec.varargs is not None,
has_kwargs=ctx.spec.varkw is not None,
returns=ctx.returns, returns=ctx.returns,
) )

View file

@ -0,0 +1,12 @@
from enum import IntEnum
class ParseState(IntEnum):
"""
Parse state.
"""
DOC = 0
PARAM = 1
TYPE = 2
RETURN = 3

View file

@ -0,0 +1,14 @@
from abc import ABC, abstractmethod
class Serializable(ABC):
"""
Base class for reflection entities that can be serialized to JSON/YAML.
"""
@abstractmethod
def to_dict(self) -> dict:
"""
Serialize the entity to a string.
"""
raise NotImplementedError()

View file

@ -0,0 +1,12 @@
import re
from typing import Optional, Type
def type_str(t: Optional[Type]) -> Optional[str]:
"""
:return: A human-readable representation of a type.
"""
if not t:
return None
return re.sub(r"<class '(.*)'>", r'\1', str(t).replace('typing.', ''))

View file

@ -13,6 +13,8 @@ import socket
import ssl import ssl
import time import time
import urllib.request import urllib.request
from importlib.machinery import SourceFileLoader
from importlib.util import spec_from_loader, module_from_spec
from multiprocessing import Lock as PLock from multiprocessing import Lock as PLock
from tempfile import gettempdir from tempfile import gettempdir
from threading import Lock as TLock from threading import Lock as TLock
@ -86,7 +88,7 @@ def get_backend_module_by_name(backend_name):
return None return None
def get_plugin_class_by_name(plugin_name): def get_plugin_class_by_name(plugin_name) -> Optional[type]:
"""Gets the class of a plugin by name (e.g. "music.mpd" or "media.vlc")""" """Gets the class of a plugin by name (e.g. "music.mpd" or "media.vlc")"""
module = get_plugin_module_by_name(plugin_name) module = get_plugin_module_by_name(plugin_name)
@ -123,7 +125,7 @@ def get_plugin_name_by_class(plugin) -> Optional[str]:
return '.'.join(class_tokens) return '.'.join(class_tokens)
def get_backend_class_by_name(backend_name: str): def get_backend_class_by_name(backend_name: str) -> Optional[type]:
"""Gets the class of a backend by name (e.g. "backend.http" or "backend.mqtt")""" """Gets the class of a backend by name (e.g. "backend.http" or "backend.mqtt")"""
module = get_backend_module_by_name(backend_name) module = get_backend_module_by_name(backend_name)
@ -685,4 +687,22 @@ def get_message_response(msg):
return response return response
def import_file(path: str, name: Optional[str] = None):
"""
Import a Python file as a module, even if no __init__.py is
defined in the directory.
:param path: Path of the file to import.
:param name: Custom name for the imported module (default: same as the file's basename).
:return: The imported module.
"""
name = name or re.split(r"\.py$", os.path.basename(path))[0]
loader = SourceFileLoader(name, os.path.expanduser(path))
mod_spec = spec_from_loader(name, loader)
assert mod_spec, f"Cannot create module specification for {path}"
mod = module_from_spec(mod_spec)
loader.exec_module(mod)
return mod
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -273,6 +273,14 @@ class Dependencies:
by_pkg_manager: Dict[PackageManagers, Set[str]] = field(default_factory=dict) by_pkg_manager: Dict[PackageManagers, Set[str]] = field(default_factory=dict)
""" All system dependencies, grouped by package manager. """ """ All system dependencies, grouped by package manager. """
def to_dict(self):
return {
'before': self.before,
'packages': list(self.packages),
'pip': self.pip,
'after': self.after,
}
@property @property
def _is_venv(self) -> bool: def _is_venv(self) -> bool:
""" """
@ -517,6 +525,17 @@ class Manifest(ABC):
:return: The type of the manifest. :return: The type of the manifest.
""" """
@property
def file(self) -> str:
"""
:return: The path to the manifest file.
"""
return os.path.join(
get_src_root(),
*self.package.split('.')[1:],
'manifest.yaml',
)
def _init_deps(self, install: Mapping[str, Iterable[str]]) -> Dependencies: def _init_deps(self, install: Mapping[str, Iterable[str]]) -> Dependencies:
deps = Dependencies() deps = Dependencies()
for key, items in install.items(): for key, items in install.items():

View file

@ -6,6 +6,8 @@ from importlib.machinery import ModuleSpec
from types import ModuleType from types import ModuleType
from typing import Any, Iterator, Sequence, Generator, Optional, List from typing import Any, Iterator, Sequence, Generator, Optional, List
from .modules import mock_imports
class MockObject: class MockObject:
""" """
@ -137,7 +139,7 @@ class MockModule(ModuleType):
class MockFinder(MetaPathFinder): class MockFinder(MetaPathFinder):
"""A finder for mocking.""" """A finder for mocking."""
def __init__(self, modules: Sequence[str]) -> None: def __init__(self, modules: Sequence[str]) -> None: # noqa
super().__init__() super().__init__()
self.modules = modules self.modules = modules
self.loader = MockLoader(self) self.loader = MockLoader(self)
@ -178,7 +180,7 @@ class MockLoader(Loader):
@contextmanager @contextmanager
def mock(*modules: str) -> Generator[None, None, None]: def mock(*mods: str) -> Generator[None, None, None]:
""" """
Insert mock modules during context:: Insert mock modules during context::
@ -188,10 +190,25 @@ def mock(*modules: str) -> Generator[None, None, None]:
""" """
finder = None finder = None
try: try:
finder = MockFinder(modules) finder = MockFinder(mods)
sys.meta_path.insert(0, finder) sys.meta_path.insert(0, finder)
yield yield
finally: finally:
if finder: if finder:
sys.meta_path.remove(finder) sys.meta_path.remove(finder)
finder.invalidate_caches() finder.invalidate_caches()
@contextmanager
def auto_mocks():
"""
Automatically mock all the modules listed in ``mock_imports``.
"""
with mock(*mock_imports):
yield
__all__ = [
"auto_mocks",
"mock",
]

View file

@ -0,0 +1,111 @@
mock_imports = [
"Adafruit_IO",
"Adafruit_Python_DHT",
"Leap",
"PIL",
"PyOBEX",
"PyOBEX.client",
"RPLCD",
"RPi.GPIO",
"TheengsDecoder",
"aiofiles",
"aiofiles.os",
"aiohttp",
"aioxmpp",
"apiclient",
"async_lru",
"avs",
"bcrypt",
"bleak",
"bluetooth",
"bluetooth_numbers",
"cpuinfo",
"croniter",
"cups",
"cv2",
"cwiid",
"dbus",
"deepspeech",
"defusedxml",
"docutils",
"envirophat",
"feedparser",
"gevent.wsgi",
"gi",
"gi.repository",
"google",
"google.assistant.embedded",
"google.assistant.library",
"google.assistant.library.event",
"google.assistant.library.file_helpers",
"google.oauth2.credentials",
"googlesamples",
"googlesamples.assistant.grpc.audio_helpers",
"gps",
"graphyte",
"grpc",
"gunicorn",
"httplib2",
"icalendar",
"imapclient",
"inotify",
"inputs",
"irc",
"irc.bot",
"irc.client",
"irc.connection",
"irc.events",
"irc.strings",
"kafka",
"keras",
"linode_api4",
"luma",
"mpd",
"ndef",
"nfc",
"nio",
"numpy",
"oauth2client",
"oauth2client",
"omxplayer",
"openzwave",
"pandas",
"paramiko",
"picamera",
"plexapi",
"pmw3901",
"psutil",
"pvcheetah",
"pvporcupine ",
"pyHS100",
"pyaudio",
"pyclip",
"pydbus",
"pyfirmata2",
"pyngrok",
"pyotp",
"pysmartthings",
"pyzbar",
"rtmidi",
"samsungtvws",
"serial",
"simple_websocket",
"smartcard",
"sounddevice",
"soundfile",
"telegram",
"telegram.ext",
"tenacity",
"tensorflow",
"todoist",
"trello",
"twilio",
"uvicorn",
"watchdog",
"wave",
"websockets",
"zeroconf",
]
"""
List of modules that should be mocked when building the documentation or running tests.
"""