diff --git a/platypush/plugins/inspect/__init__.py b/platypush/plugins/inspect/__init__.py index 5a9c1761..5b57cb16 100644 --- a/platypush/plugins/inspect/__init__.py +++ b/platypush/plugins/inspect/__init__.py @@ -2,175 +2,32 @@ import importlib import inspect import json import pkgutil -import re import threading -from abc import ABC, abstractmethod from typing import Optional -import platypush.backend # lgtm [py/import-and-import-from] -import platypush.plugins # lgtm [py/import-and-import-from] -import platypush.message.event # lgtm [py/import-and-import-from] -import platypush.message.response # lgtm [py/import-and-import-from] +import platypush.backend # lgtm [py/import-and-import-from] +import platypush.plugins # lgtm [py/import-and-import-from] +import platypush.message.event # lgtm [py/import-and-import-from] +import platypush.message.response # lgtm [py/import-and-import-from] from platypush.backend import Backend from platypush.config import Config from platypush.plugins import Plugin, action from platypush.message.event import Event from platypush.message.response import Response -from platypush.utils import get_decorators - -class Model(ABC): - def __str__(self): - return json.dumps(dict(self), indent=2, sort_keys=True) - - def __repr__(self): - return json.dumps(dict(self)) - - @staticmethod - def to_html(doc): - try: - import docutils.core - except ImportError: - # docutils not found - return doc - - return docutils.core.publish_parts(doc, writer_name='html')['html_body'] - - @abstractmethod - def __iter__(self): - raise NotImplementedError() - - -class ProcedureEncoder(json.JSONEncoder): - def default(self, o): - if callable(o): - return { - 'type': 'native_function', - } - - return super().default(o) - - -class BackendModel(Model): - def __init__(self, backend, prefix='', html_doc: Optional[bool] = False): - self.name = backend.__module__[len(prefix):] - self.html_doc = html_doc - self.doc = self.to_html(backend.__doc__) if html_doc and backend.__doc__ else backend.__doc__ - - def __iter__(self): - for attr in ['name', 'doc', 'html_doc']: - yield attr, getattr(self, attr) - - -class PluginModel(Model): - def __init__(self, plugin, prefix='', html_doc: Optional[bool] = False): - self.name = plugin.__module__[len(prefix):] - self.html_doc = html_doc - self.doc = self.to_html(plugin.__doc__) if html_doc and plugin.__doc__ else plugin.__doc__ - self.actions = {action_name: ActionModel(getattr(plugin, action_name), html_doc=html_doc or False) - for action_name in get_decorators(plugin, climb_class_hierarchy=True).get('action', [])} - - def __iter__(self): - for attr in ['name', 'actions', 'doc', 'html_doc']: - if attr == 'actions': - # noinspection PyShadowingNames - yield attr, {name: dict(action) for name, action in self.actions.items()}, - else: - yield attr, getattr(self, attr) - - -class EventModel(Model): - def __init__(self, event, prefix='', html_doc: Optional[bool] = False): - self.package = event.__module__[len(prefix):] - self.name = event.__name__ - self.html_doc = html_doc - self.doc = self.to_html(event.__doc__) if html_doc and event.__doc__ else event.__doc__ - - def __iter__(self): - for attr in ['name', 'doc', 'html_doc']: - yield attr, getattr(self, attr) - - -class ResponseModel(Model): - def __init__(self, response, prefix='', html_doc: Optional[bool] = False): - self.package = response.__module__[len(prefix):] - self.name = response.__name__ - self.html_doc = html_doc - self.doc = self.to_html(response.__doc__) if html_doc and response.__doc__ else response.__doc__ - - def __iter__(self): - for attr in ['name', 'doc', 'html_doc']: - yield attr, getattr(self, attr) - - -class ActionModel(Model): - # noinspection PyShadowingNames - def __init__(self, action, html_doc: bool = False): - self.name = action.__name__ - self.doc, argsdoc = self._parse_docstring(action.__doc__, html_doc=html_doc) - self.args = {} - self.has_kwargs = False - - for arg in list(inspect.signature(action).parameters.values())[1:]: - if arg.kind == arg.VAR_KEYWORD: - self.has_kwargs = True - continue - - self.args[arg.name] = { - 'default': arg.default if not issubclass(arg.default.__class__, type) else None, - 'doc': argsdoc.get(arg.name) - } - - @classmethod - def _parse_docstring(cls, docstring: str, html_doc: bool = False): - new_docstring = '' - params = {} - cur_param = None - cur_param_docstring = '' - - if not docstring: - return None, {} - - for line in docstring.split('\n'): - m = re.match(r'^\s*:param ([^:]+):\s*(.*)', line) - if m: - if cur_param: - params[cur_param] = cls.to_html(cur_param_docstring) if html_doc else cur_param_docstring - - cur_param = m.group(1) - cur_param_docstring = m.group(2) - elif re.match(r'^\s*:[^:]+:\s*.*', line): - continue - else: - if cur_param: - if not line.strip(): - params[cur_param] = cls.to_html(cur_param_docstring) if html_doc else cur_param_docstring - cur_param = None - cur_param_docstring = '' - else: - cur_param_docstring += '\n' + line.strip() - else: - new_docstring += line.rstrip() + '\n' - - if cur_param: - params[cur_param] = cls.to_html(cur_param_docstring) if html_doc else cur_param_docstring - - return new_docstring.strip() if not html_doc else cls.to_html(new_docstring), params - - def __iter__(self): - for attr in ['name', 'args', 'doc', 'has_kwargs']: - yield attr, getattr(self, attr) +from ._model import ( + BackendModel, + EventModel, + PluginModel, + ProcedureEncoder, + ResponseModel, +) class InspectPlugin(Plugin): """ This plugin can be used to inspect platypush plugins and backends - - Requires: - - * **docutils** (``pip install docutils``) - optional, for HTML doc generation - """ def __init__(self, **kwargs): @@ -189,19 +46,20 @@ class InspectPlugin(Plugin): package = platypush.plugins prefix = package.__name__ + '.' - for _, modname, _ in pkgutil.walk_packages(path=package.__path__, - prefix=prefix, - onerror=lambda _: None): + for _, modname, _ in pkgutil.walk_packages( + path=package.__path__, prefix=prefix, onerror=lambda _: None + ): try: module = importlib.import_module(modname) except Exception as e: - self.logger.warning(f'Could not import module {modname}') - self.logger.exception(e) + self.logger.warning('Could not import module %s: %s', modname, e) continue for _, obj in inspect.getmembers(module): if inspect.isclass(obj) and issubclass(obj, Plugin): - model = PluginModel(plugin=obj, prefix=prefix, html_doc=self._html_doc) + model = PluginModel( + plugin=obj, prefix=prefix, html_doc=self._html_doc + ) if model.name: self._plugins[model.name] = model @@ -209,18 +67,20 @@ class InspectPlugin(Plugin): package = platypush.backend prefix = package.__name__ + '.' - for _, modname, _ in pkgutil.walk_packages(path=package.__path__, - prefix=prefix, - onerror=lambda _: None): + for _, modname, _ in pkgutil.walk_packages( + path=package.__path__, prefix=prefix, onerror=lambda _: None + ): try: module = importlib.import_module(modname) except Exception as e: - self.logger.debug(f'Could not import module {modname}: {str(e)}') + self.logger.debug('Could not import module %s: %s', modname, e) continue for _, obj in inspect.getmembers(module): if inspect.isclass(obj) and issubclass(obj, Backend): - model = BackendModel(backend=obj, prefix=prefix, html_doc=self._html_doc) + model = BackendModel( + backend=obj, prefix=prefix, html_doc=self._html_doc + ) if model.name: self._backends[model.name] = model @@ -228,21 +88,23 @@ class InspectPlugin(Plugin): package = platypush.message.event prefix = package.__name__ + '.' - for _, modname, _ in pkgutil.walk_packages(path=package.__path__, - prefix=prefix, - onerror=lambda _: None): + for _, modname, _ in pkgutil.walk_packages( + path=package.__path__, prefix=prefix, onerror=lambda _: None + ): try: module = importlib.import_module(modname) except Exception as e: - self.logger.debug(f'Could not import module {modname}: {str(e)}') + self.logger.debug('Could not import module %s: %s', modname, e) continue for _, obj in inspect.getmembers(module): - if type(obj) == Event: + if type(obj) == Event: # pylint: disable=unidiomatic-typecheck continue if inspect.isclass(obj) and issubclass(obj, Event) and obj != Event: - event = EventModel(event=obj, html_doc=self._html_doc, prefix=prefix) + event = EventModel( + event=obj, html_doc=self._html_doc, prefix=prefix + ) if event.package not in self._events: self._events[event.package] = {event.name: event} else: @@ -252,21 +114,27 @@ class InspectPlugin(Plugin): package = platypush.message.response prefix = package.__name__ + '.' - for _, modname, _ in pkgutil.walk_packages(path=package.__path__, - prefix=prefix, - onerror=lambda _: None): + for _, modname, _ in pkgutil.walk_packages( + path=package.__path__, prefix=prefix, onerror=lambda _: None + ): try: module = importlib.import_module(modname) except Exception as e: - self.logger.debug(f'Could not import module {modname}: {str(e)}') + self.logger.debug('Could not import module %s: %s', modname, e) continue for _, obj in inspect.getmembers(module): - if type(obj) == Response: + if type(obj) == Response: # pylint: disable=unidiomatic-typecheck continue - if inspect.isclass(obj) and issubclass(obj, Response) and obj != Response: - response = ResponseModel(response=obj, html_doc=self._html_doc, prefix=prefix) + if ( + inspect.isclass(obj) + and issubclass(obj, Response) + and obj != Response + ): + response = ResponseModel( + response=obj, html_doc=self._html_doc, prefix=prefix + ) if response.package not in self._responses: self._responses[response.package] = {response.name: response} else: @@ -278,14 +146,15 @@ class InspectPlugin(Plugin): :param html_doc: If True then the docstring will be parsed into HTML (default: False) """ with self._plugins_lock: - if not self._plugins or (html_doc is not None and html_doc != self._html_doc): + if not self._plugins or ( + html_doc is not None and html_doc != self._html_doc + ): self._html_doc = html_doc self._init_plugins() - return json.dumps({ - name: dict(plugin) - for name, plugin in self._plugins.items() - }) + return json.dumps( + {name: dict(plugin) for name, plugin in self._plugins.items()} + ) @action def get_all_backends(self, html_doc: Optional[bool] = None): @@ -293,14 +162,15 @@ class InspectPlugin(Plugin): :param html_doc: If True then the docstring will be parsed into HTML (default: False) """ with self._backends_lock: - if not self._backends or (html_doc is not None and html_doc != self._html_doc): + if not self._backends or ( + html_doc is not None and html_doc != self._html_doc + ): self._html_doc = html_doc self._init_backends() - return json.dumps({ - name: dict(backend) - for name, backend in self._backends.items() - }) + return json.dumps( + {name: dict(backend) for name, backend in self._backends.items()} + ) @action def get_all_events(self, html_doc: Optional[bool] = None): @@ -308,17 +178,18 @@ class InspectPlugin(Plugin): :param html_doc: If True then the docstring will be parsed into HTML (default: False) """ with self._events_lock: - if not self._events or (html_doc is not None and html_doc != self._html_doc): + if not self._events or ( + html_doc is not None and html_doc != self._html_doc + ): self._html_doc = html_doc self._init_events() - return json.dumps({ - package: { - name: dict(event) - for name, event in events.items() + return json.dumps( + { + package: {name: dict(event) for name, event in events.items()} + for package, events in self._events.items() } - for package, events in self._events.items() - }) + ) @action def get_all_responses(self, html_doc: Optional[bool] = None): @@ -326,17 +197,18 @@ class InspectPlugin(Plugin): :param html_doc: If True then the docstring will be parsed into HTML (default: False) """ with self._responses_lock: - if not self._responses or (html_doc is not None and html_doc != self._html_doc): + if not self._responses or ( + html_doc is not None and html_doc != self._html_doc + ): self._html_doc = html_doc self._init_responses() - return json.dumps({ - package: { - name: dict(event) - for name, event in responses.items() + return json.dumps( + { + package: {name: dict(event) for name, event in responses.items()} + for package, responses in self._responses.items() } - for package, responses in self._responses.items() - }) + ) @action def get_procedures(self) -> dict: diff --git a/platypush/plugins/inspect/_model.py b/platypush/plugins/inspect/_model.py new file mode 100644 index 00000000..b3e10d6b --- /dev/null +++ b/platypush/plugins/inspect/_model.py @@ -0,0 +1,187 @@ +from abc import ABC, abstractmethod +import inspect +import json +import re +from typing import Optional + +from platypush.utils import get_decorators + + +class Model(ABC): + def __str__(self): + return json.dumps(dict(self), indent=2, sort_keys=True) + + def __repr__(self): + return json.dumps(dict(self)) + + @staticmethod + def to_html(doc): + try: + import docutils.core # type: ignore + except ImportError: + # docutils not found + return doc + + return docutils.core.publish_parts(doc, writer_name='html')['html_body'] + + @abstractmethod + def __iter__(self): + raise NotImplementedError() + + +class ProcedureEncoder(json.JSONEncoder): + def default(self, o): + if callable(o): + return { + 'type': 'native_function', + } + + return super().default(o) + + +class BackendModel(Model): + def __init__(self, backend, prefix='', html_doc: Optional[bool] = False): + self.name = backend.__module__[len(prefix) :] + self.html_doc = html_doc + self.doc = ( + self.to_html(backend.__doc__) + if html_doc and backend.__doc__ + else backend.__doc__ + ) + + def __iter__(self): + for attr in ['name', 'doc', 'html_doc']: + yield attr, getattr(self, attr) + + +class PluginModel(Model): + def __init__(self, plugin, prefix='', html_doc: Optional[bool] = False): + self.name = plugin.__module__[len(prefix) :] + self.html_doc = html_doc + self.doc = ( + self.to_html(plugin.__doc__) + if html_doc and plugin.__doc__ + else plugin.__doc__ + ) + self.actions = { + action_name: ActionModel( + getattr(plugin, action_name), html_doc=html_doc or False + ) + for action_name in get_decorators(plugin, climb_class_hierarchy=True).get( + 'action', [] + ) + } + + def __iter__(self): + for attr in ['name', 'actions', 'doc', 'html_doc']: + if attr == 'actions': + # noinspection PyShadowingNames + yield attr, { + name: dict(action) for name, action in self.actions.items() + }, + else: + yield attr, getattr(self, attr) + + +class EventModel(Model): + def __init__(self, event, prefix='', html_doc: Optional[bool] = False): + self.package = event.__module__[len(prefix) :] + self.name = event.__name__ + self.html_doc = html_doc + self.doc = ( + self.to_html(event.__doc__) if html_doc and event.__doc__ else event.__doc__ + ) + + def __iter__(self): + for attr in ['name', 'doc', 'html_doc']: + yield attr, getattr(self, attr) + + +class ResponseModel(Model): + def __init__(self, response, prefix='', html_doc: Optional[bool] = False): + self.package = response.__module__[len(prefix) :] + self.name = response.__name__ + self.html_doc = html_doc + self.doc = ( + self.to_html(response.__doc__) + if html_doc and response.__doc__ + else response.__doc__ + ) + + def __iter__(self): + for attr in ['name', 'doc', 'html_doc']: + yield attr, getattr(self, attr) + + +class ActionModel(Model): + # noinspection PyShadowingNames + def __init__(self, action, html_doc: bool = False): + self.name = action.__name__ + self.doc, argsdoc = self._parse_docstring(action.__doc__, html_doc=html_doc) + self.args = {} + self.has_kwargs = False + + for arg in list(inspect.signature(action).parameters.values())[1:]: + if arg.kind == arg.VAR_KEYWORD: + self.has_kwargs = True + continue + + self.args[arg.name] = { + 'default': arg.default + if not issubclass(arg.default.__class__, type) + else None, + 'doc': argsdoc.get(arg.name), + } + + @classmethod + def _parse_docstring(cls, docstring: str, html_doc: bool = False): + new_docstring = '' + params = {} + cur_param = None + cur_param_docstring = '' + + if not docstring: + return None, {} + + for line in docstring.split('\n'): + m = re.match(r'^\s*:param ([^:]+):\s*(.*)', line) + if m: + if cur_param: + params[cur_param] = ( + cls.to_html(cur_param_docstring) + if html_doc + else cur_param_docstring + ) + + cur_param = m.group(1) + cur_param_docstring = m.group(2) + elif re.match(r'^\s*:[^:]+:\s*.*', line): + continue + else: + if cur_param: + if not line.strip(): + params[cur_param] = ( + cls.to_html(cur_param_docstring) + if html_doc + else cur_param_docstring + ) + cur_param = None + cur_param_docstring = '' + else: + cur_param_docstring += '\n' + line.strip() + else: + new_docstring += line.rstrip() + '\n' + + if cur_param: + params[cur_param] = ( + cls.to_html(cur_param_docstring) if html_doc else cur_param_docstring + ) + + return ( + new_docstring.strip() if not html_doc else cls.to_html(new_docstring), + params, + ) + + def __iter__(self): + for attr in ['name', 'args', 'doc', 'has_kwargs']: + yield attr, getattr(self, attr)