platypush/platypush/plugins/inspect/__init__.py

290 lines
9.4 KiB
Python
Raw Normal View History

2019-12-08 16:25:03 +01:00
import importlib
import inspect
import json
import os
import pathlib
import pkgutil
from concurrent.futures import Future, ThreadPoolExecutor
from typing import List, Optional
2019-12-08 16:25:03 +01:00
2019-12-30 10:16:55 +01:00
from platypush.backend import Backend
from platypush.common.db import override_definitions
from platypush.common.reflection import Integration, Message as MessageMetadata
from platypush.config import Config
2019-12-08 16:25:03 +01:00
from platypush.plugins import Plugin, action
from platypush.message import Message
2019-12-30 18:50:01 +01:00
from platypush.message.event import Event
from platypush.message.response import Response
from platypush.utils import get_enabled_backends, get_enabled_plugins
from platypush.utils.mock import auto_mocks
from platypush.utils.manifest import Manifest, Manifests
from ._cache import Cache
from ._serialize import ProcedureEncoder
2019-12-08 16:25:03 +01:00
class InspectPlugin(Plugin):
"""
This plugin can be used to inspect platypush plugins and backends
"""
_num_workers = 8
"""Number of threads to use for the inspection."""
2019-12-08 16:25:03 +01:00
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._cache_file = os.path.join(Config.get_cachedir(), 'components.json')
self._cache = Cache()
self._load_cache()
def _load_cache(self):
"""
Loads the components cache from disk.
"""
with self._cache.lock(), auto_mocks(), override_definitions():
try:
self._cache = Cache.load(self._cache_file)
except Exception as e:
self.logger.warning(
'Could not initialize the components cache from %s: %s',
self._cache_file,
e,
)
self._cache = Cache()
self._refresh_cache()
def _refresh_cache(self):
"""
Refreshes the components cache.
"""
cache_version_differs = self._cache.version != Cache.cur_version
with ThreadPoolExecutor(self._num_workers) as pool:
futures = []
for base_type in [Plugin, Backend]:
futures.append(
pool.submit(
self._scan_integrations,
base_type,
pool=pool,
force_refresh=cache_version_differs,
futures=futures,
)
)
for base_type in [Event, Response]:
futures.append(
pool.submit(
self._scan_modules,
base_type,
pool=pool,
force_refresh=cache_version_differs,
futures=futures,
)
)
2019-12-08 16:25:03 +01:00
while futures:
futures.pop().result()
2019-12-08 16:25:03 +01:00
if self._cache.has_changes:
self.logger.info('Saving new components cache to %s', self._cache_file)
self._cache.dump(self._cache_file)
self._cache.loaded_at = self._cache.saved_at
2019-12-30 10:16:55 +01:00
def _scan_integration(self, manifest: Manifest):
"""
Scans a single integration from the manifest and adds it to the cache.
"""
try:
self._cache_integration(Integration.from_manifest(manifest.file))
except Exception as e:
self.logger.warning(
'Could not import module %s: %s',
manifest.package,
e,
)
2019-12-30 10:16:55 +01:00
def _scan_integrations(
self,
base_type: type,
pool: ThreadPoolExecutor,
futures: List[Future],
force_refresh: bool = False,
):
"""
Scans the integrations with a manifest file (plugins and backends) and
refreshes the cache.
"""
for manifest in Manifests.by_base_class(base_type):
# An integration metadata needs to be refreshed if it's been
# modified since it was last loaded, or if it's not in the
# cache.
if force_refresh or self._needs_refresh(manifest.file):
futures.append(pool.submit(self._scan_integration, manifest))
2019-12-30 10:16:55 +01:00
def _scan_module(self, base_type: type, modname: str):
"""
Scans a single module for objects that match the given base_type and
adds them to the cache.
"""
try:
module = importlib.import_module(modname)
except Exception as e:
self.logger.warning('Could not import module %s: %s', modname, e)
return
2019-12-30 18:50:01 +01:00
for _, obj_type in inspect.getmembers(module):
if (
inspect.isclass(obj_type)
and issubclass(obj_type, base_type)
# Exclude the base_type itself
and obj_type != base_type
):
self.logger.info(
'Scanned %s: %s',
base_type.__name__,
f'{module.__name__}.{obj_type.__name__}',
)
2019-12-30 18:50:01 +01:00
self._cache.set(
base_type, obj_type, MessageMetadata.by_type(obj_type).to_dict()
)
def _scan_modules(
self,
base_type: type,
pool: ThreadPoolExecutor,
futures: List[Future],
force_refresh: bool = False,
):
"""
A generator that scans the modules given a ``base_type`` (e.g. ``Event``).
2020-03-05 23:19:26 +01:00
It's a bit more inefficient than :meth:`._scan_integrations` because it
needs to inspect all the members of a module to find the ones that
match the given ``base_type``, but it works fine for simple components
(like messages) that don't require extra recursive parsing and don't
have a manifest.
"""
prefix = base_type.__module__ + '.'
path = str(pathlib.Path(inspect.getfile(base_type)).parent)
for _, modname, __ in pkgutil.walk_packages(
path=[path], prefix=prefix, onerror=lambda _: None
):
try:
filename = self._module_filename(path, '.'.join(modname.split('.')[3:]))
if not (force_refresh or self._needs_refresh(filename)):
continue
except Exception as e:
self.logger.warning('Could not scan module %s: %s', modname, e)
continue
futures.append(pool.submit(self._scan_module, base_type, modname))
def _needs_refresh(self, filename: str) -> bool:
"""
:return: True if the given file needs to be refreshed in the cache.
"""
return os.lstat(os.path.dirname(filename)).st_mtime > (
self._cache.saved_at or 0
)
@staticmethod
def _module_filename(path: str, modname: str) -> str:
"""
:param path: Path to the module.
:param modname: Module name.
:return: The full path to the module file.
"""
filename = os.path.join(path, *modname.split('.')) + '.py'
if not os.path.isfile(filename):
filename = os.path.join(path, *modname.split('.'), '__init__.py')
assert os.path.isfile(filename), f'No such file or directory: {filename}'
return filename
def _cache_integration(self, integration: Integration) -> dict:
"""
:param integration: The :class:`.IntegrationMetadata` object.
:return: The initialized component's metadata dict.
"""
self.logger.info(
'Scanned %s: %s', integration.base_type.__name__, integration.name
)
meta = integration.to_dict()
self._cache.set(integration.base_type, integration.type, meta)
return meta
2019-12-08 16:25:03 +01:00
@action
def get_all_plugins(self):
2019-12-08 16:25:03 +01:00
"""
Get information about all the available plugins.
2019-12-08 16:25:03 +01:00
"""
return json.dumps(self._cache.to_dict().get('plugins', {}), cls=Message.Encoder)
2019-12-08 16:25:03 +01:00
2019-12-30 10:16:55 +01:00
@action
def get_all_backends(self):
2019-12-30 10:16:55 +01:00
"""
Get information about all the available backends.
2019-12-30 10:16:55 +01:00
"""
return json.dumps(
self._cache.to_dict().get('backends', {}), cls=Message.Encoder
)
2019-12-30 10:16:55 +01:00
2019-12-30 18:50:01 +01:00
@action
def get_all_events(self):
2019-12-30 18:50:01 +01:00
"""
Get information about all the available events.
2019-12-30 18:50:01 +01:00
"""
return json.dumps(self._cache.to_dict().get('events', {}), cls=Message.Encoder)
2019-12-30 18:50:01 +01:00
@action
def get_all_responses(self):
"""
Get information about all the available responses.
"""
return json.dumps(
self._cache.to_dict().get('responses', {}), cls=Message.Encoder
)
@action
def get_procedures(self) -> dict:
"""
Get the list of procedures installed on the device.
"""
return json.loads(json.dumps(Config.get_procedures(), cls=ProcedureEncoder))
@action
def get_config(self, entry: Optional[str] = None) -> Optional[dict]:
"""
Return the configuration of the application or of a section.
:param entry: [Optional] configuration entry name to retrieve (e.g. ``workdir`` or ``backend.http``).
:return: The requested configuration object.
"""
if entry:
return Config.get(entry)
return Config.get()
@action
def get_enabled_plugins(self) -> List[str]:
"""
Get the list of enabled plugins.
"""
return list(get_enabled_plugins().keys())
@action
def get_enabled_backends(self) -> List[str]:
"""
Get the list of enabled backends.
"""
return list(get_enabled_backends().keys())
2019-12-08 16:25:03 +01:00
# vim:sw=4:ts=4:et: