platypush/platypush/utils/manifest.py

693 lines
21 KiB
Python

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
import importlib
import inspect
import json
import logging
import os
import pathlib
import re
import shutil
import subprocess
import sys
from typing import (
Callable,
Dict,
Generator,
List,
Optional,
Iterable,
Mapping,
Sequence,
Set,
Type,
Union,
)
import yaml
from platypush.utils import get_src_root, is_root
_available_package_manager = None
logger = logging.getLogger(__name__)
class BaseImage(Enum):
"""
Supported base images for Dockerfiles.
"""
ALPINE = 'alpine'
DEBIAN = 'debian'
FEDORA = 'fedora'
UBUNTU = 'ubuntu'
def __str__(self) -> str:
"""
Explicit __str__ override for argparse purposes.
"""
return self.value
@dataclass
class OSMeta:
"""
Operating system metadata.
"""
name: str
description: str
class OS(Enum):
"""
Supported operating systems.
"""
ALPINE = OSMeta('alpine', 'Alpine')
ARCH = OSMeta('arch', 'Arch Linux')
DEBIAN = OSMeta('debian', 'Debian')
FEDORA = OSMeta('fedora', 'Fedora')
UBUNTU = OSMeta('ubuntu', 'Ubuntu')
@dataclass
class PackageManager:
"""
Representation of a package manager.
"""
executable: str
""" The executable name. """
default_os: OS
"""
The default distro whose configuration we should use if this package
manager is detected.
"""
install_doc: str
""" The base install command that will be used in the generated documentation. """
install: Sequence[str] = field(default_factory=tuple)
""" The install command, as a sequence of strings. """
uninstall: Sequence[str] = field(default_factory=tuple)
""" The uninstall command, as a sequence of strings. """
list: Sequence[str] = field(default_factory=tuple)
""" The command to list the installed packages. """
parse_list_line: Callable[[str], str] = field(default_factory=lambda: lambda s: s)
"""
Internal package-manager dependent function that parses the base package
name from a line returned by the list command.
"""
def _get_installed(self) -> Sequence[str]:
"""
:return: The context-aware list of installed packages.
It should only be used within the context of :meth:`.get_installed`.
"""
if os.environ.get('DOCKER_CTX'):
# If we're running in a Docker build context, don't run the package
# manager to retrieve the list of installed packages, as the host
# and guest systems have different environments.
return ()
return tuple(
line.strip()
for line in subprocess.Popen( # pylint: disable=consider-using-with
self.list, stdout=subprocess.PIPE
)
.communicate()[0]
.decode()
.split('\n')
if line.strip()
)
def get_installed(self) -> Sequence[str]:
"""
:return: The list of installed packages.
"""
return tuple(self.parse_list_line(line) for line in self._get_installed())
class PackageManagers(Enum):
"""
Supported package managers.
"""
APK = PackageManager(
executable='apk',
install_doc='apk add',
install=('apk', 'add', '--update', '--no-interactive', '--no-cache'),
uninstall=('apk', 'del', '--no-interactive'),
list=('apk', 'list', '--installed'),
default_os=OS.ALPINE,
parse_list_line=lambda line: re.sub(r'.*\s*\{(.+?)}\s*.*', r'\1', line),
)
APT = PackageManager(
executable='apt',
install_doc='apt install',
install=('apt', 'install', '-y'),
uninstall=('apt', 'remove', '-y'),
list=('apt', 'list', '--installed'),
default_os=OS.DEBIAN,
parse_list_line=lambda line: line.split('/')[0],
)
DNF = PackageManager(
executable='dnf',
install_doc='yum install',
install=('dnf', 'install', '-y'),
uninstall=('dnf', 'remove', '-y'),
list=('dnf', 'list', '--installed'),
default_os=OS.FEDORA,
parse_list_line=lambda line: re.split(r'\s+', line)[0].split('.')[0],
)
PACMAN = PackageManager(
executable='pacman',
install_doc='pacman -S',
install=('pacman', '-S', '--noconfirm', '--needed'),
uninstall=('pacman', '-R', '--noconfirm'),
list=('pacman', '-Q'),
default_os=OS.ARCH,
parse_list_line=lambda line: line.split(' ')[0],
)
@classmethod
def by_executable(cls, name: str) -> "PackageManagers":
"""
:param name: The name of the package manager executable to get the
package manager for.
:return: The `PackageManager` object for the given executable.
"""
pkg_manager = next(iter(pm for pm in cls if pm.value.executable == name), None)
if not pkg_manager:
raise ValueError(f'Unknown package manager: {name}')
return pkg_manager
@classmethod
def get_command(cls, name: str) -> Iterable[str]:
"""
:param name: The name of the package manager executable to get the
command for.
:return: The base command to execute, as a sequence of strings.
"""
pkg_manager = next(iter(pm for pm in cls if pm.value.executable == name), None)
if not pkg_manager:
raise ValueError(f'Unknown package manager: {name}')
return pkg_manager.value.install
@classmethod
def scan(cls) -> Optional["PackageManagers"]:
"""
Get the name of the available package manager on the system, if supported.
"""
# pylint: disable=global-statement
global _available_package_manager
if _available_package_manager:
return _available_package_manager
available_package_managers = [
pkg_manager
for pkg_manager in cls
if shutil.which(pkg_manager.value.executable)
]
if not available_package_managers:
logger.warning(
'\nYour OS does not provide any of the supported package managers.\n'
'You may have to install some optional dependencies manually.\n'
'Supported package managers: %s.\n',
', '.join([pm.value.executable for pm in cls]),
)
return None
_available_package_manager = available_package_managers[0]
return _available_package_manager
class InstallContext(Enum):
"""
Supported installation contexts.
"""
NONE = None
DOCKER = 'docker'
VENV = 'venv'
class ManifestType(Enum):
"""
Manifest types.
"""
PLUGIN = 'plugin'
BACKEND = 'backend'
@dataclass
class Dependencies:
"""
Dependencies for a plugin/backend.
"""
before: List[str] = field(default_factory=list)
""" Commands to execute before the component is installed. """
packages: Set[str] = field(default_factory=set)
""" System packages required by the component. """
pip: Set[str] = field(default_factory=set)
""" pip dependencies. """
after: List[str] = field(default_factory=list)
""" Commands to execute after the component is installed. """
pkg_manager: Optional[PackageManagers] = None
""" Override the default package manager detected on the system. """
install_context: InstallContext = InstallContext.NONE
""" The installation context - Docker, virtual environment or bare metal. """
base_image: Optional[BaseImage] = None
""" Base image used in case of Docker installations. """
by_pkg_manager: Dict[PackageManagers, Set[str]] = field(default_factory=dict)
""" All system dependencies, grouped by package manager. """
def to_dict(self):
return {
'before': self.before,
'packages': list(self.packages),
'pip': self.pip,
'after': self.after,
'by_pkg_manager': {
pkg_manager.value.executable: list(pkgs)
for pkg_manager, pkgs in self.by_pkg_manager.items()
},
}
@property
def _is_venv(self) -> bool:
"""
:return: True if the dependencies scanning logic is running either in a
virtual environment or in a virtual environment preparation
context.
"""
return (
self.install_context == InstallContext.VENV or sys.prefix != sys.base_prefix
)
@property
def _is_docker(self) -> bool:
"""
:return: True if the dependencies scanning logic is running either in a
Docker environment.
"""
return (
self.install_context == InstallContext.DOCKER
or bool(os.environ.get('DOCKER_CTX'))
or os.path.isfile('/.dockerenv')
)
@property
def _wants_sudo(self) -> bool:
"""
:return: True if the system dependencies should be installed with sudo.
"""
return not (self._is_docker or is_root())
@staticmethod
def _get_requirements_dir() -> str:
"""
:return: The root folder for the base installation requirements.
"""
return os.path.join(get_src_root(), 'install', 'requirements')
@classmethod
def _parse_requirements_file(
cls,
requirements_file: str,
install_context: InstallContext = InstallContext.NONE,
) -> Iterable[str]:
"""
:param requirements_file: The path to the requirements file.
:return: The list of requirements to install.
"""
with open(requirements_file, 'r') as f:
return {
line.strip()
for line in f
if not (
not line.strip()
or line.strip().startswith('#')
# Virtual environments will install all the Python
# dependencies via pip, so we should skip them here
or (
install_context == InstallContext.VENV
and cls._is_python_pkg(line.strip())
)
)
}
@classmethod
def _get_base_system_dependencies(
cls,
pkg_manager: Optional[PackageManagers] = None,
install_context: InstallContext = InstallContext.NONE,
) -> Iterable[str]:
"""
:return: The base system dependencies that should be installed on the
system.
"""
# Docker images will install the base packages through their own
# dedicated shell script, so don't report their base system
# requirements here.
if not (pkg_manager and install_context != InstallContext.DOCKER):
return set()
return cls._parse_requirements_file(
os.path.join(
cls._get_requirements_dir(),
pkg_manager.value.default_os.name.lower() + '.txt',
),
install_context,
)
@staticmethod
def _is_python_pkg(pkg: str) -> bool:
"""
Utility function that returns True if a given package is a Python
system package. These should be skipped during a virtual
environment installation, as the virtual environment will be
installed via pip.
"""
tokens = pkg.split('-')
return len(tokens) > 1 and tokens[0] in {'py3', 'python3', 'python'}
@classmethod
def from_config(
cls,
conf_file: Optional[str] = None,
pkg_manager: Optional[PackageManagers] = None,
install_context: InstallContext = InstallContext.NONE,
base_image: Optional[BaseImage] = None,
) -> "Dependencies":
"""
Parse the required dependencies from a configuration file.
"""
if not pkg_manager:
pkg_manager = PackageManagers.scan()
base_system_deps = cls._get_base_system_dependencies(
pkg_manager=pkg_manager, install_context=install_context
)
deps = cls(
packages=set(base_system_deps),
pkg_manager=pkg_manager,
install_context=install_context,
base_image=base_image,
)
for manifest in Manifests.by_config(conf_file, pkg_manager=pkg_manager):
deps.before += manifest.install.before
deps.pip.update(manifest.install.pip)
deps.packages.update(manifest.install.packages)
deps.after += manifest.install.after
return deps
def to_pkg_install_commands(self) -> Generator[str, None, None]:
"""
Generates the package manager commands required to install the given
dependencies on the system.
"""
pkg_manager = self.pkg_manager or PackageManagers.scan()
if self.packages and pkg_manager:
installed_packages = pkg_manager.value.get_installed()
to_install = sorted(
pkg
for pkg in self.packages # type: ignore
if pkg not in installed_packages
and not (
self.install_context == InstallContext.VENV
and self._is_python_pkg(pkg)
)
)
if to_install:
yield ' '.join(
[
*(['sudo'] if self._wants_sudo else []),
*pkg_manager.value.install,
'\\\n '.join(to_install),
]
)
def to_pip_install_commands(self, full_command=True) -> Generator[str, None, None]:
"""
Generates the pip commands required to install the given dependencies on
the system.
:param full_command: Whether to return the full pip command to execute
(as a single string) or the list of packages that will be installed
through another script.
"""
wants_break_system_packages = not (
# Docker installations shouldn't require --break-system-packages in
# pip, except for Debian
(self._is_docker and self.base_image != BaseImage.DEBIAN)
# --break-system-packages has been introduced in Python 3.10
or sys.version_info < (3, 11)
# If we're in a virtual environment then we don't need
# --break-system-packages
or self._is_venv
)
if self.pip:
deps = sorted(self.pip)
if full_command:
yield (
'pip install --no-input '
+ ('--no-cache-dir ' if self._is_docker else '')
+ (
'--break-system-packages '
if wants_break_system_packages
else ''
)
+ ' \\\n '.join(deps)
)
else:
for dep in deps:
yield dep
def to_install_commands(self) -> Generator[str, None, None]:
"""
Generates the commands required to install the given dependencies on
this system.
"""
for cmd in self.before:
yield cmd
for cmd in self.to_pkg_install_commands():
yield cmd
for cmd in self.to_pip_install_commands():
yield cmd
for cmd in self.after:
yield cmd
class Manifest(ABC):
"""
Base class for plugin/backend manifests.
"""
def __init__(
self,
package: str,
description: Optional[str] = None,
install: Optional[Dict[str, Iterable[str]]] = None,
events: Optional[Mapping] = None,
pkg_manager: Optional[PackageManagers] = None,
**_,
):
self._pkg_manager = pkg_manager or PackageManagers.scan()
self.description = description
self.install = self._init_deps(install or {})
self.events = self._init_events(events or {})
self.package = package
self.component_name = '.'.join(package.split('.')[2:])
self.component = None
@property
@abstractmethod
def manifest_type(self) -> ManifestType:
"""
:return: The type of the manifest.
"""
@property
def file(self) -> str:
"""
:return: The path to the manifest file.
"""
return os.path.join(
get_src_root(),
*self.package.split('.')[1:],
'manifest.yaml',
)
def _init_deps(self, install: Mapping[str, Iterable[str]]) -> Dependencies:
deps = Dependencies()
for key, items in install.items():
if key == 'pip':
deps.pip.update(items)
elif key == 'before':
deps.before += items
elif key == 'after':
deps.after += items
else:
deps.by_pkg_manager[PackageManagers.by_executable(key)] = set(items)
if self._pkg_manager and key == self._pkg_manager.value.executable:
deps.packages.update(items)
return deps
@staticmethod
def _init_events(
events: Union[Iterable[str], Mapping[str, Optional[str]]]
) -> Dict[Type, str]:
evt_dict = events if isinstance(events, Mapping) else {e: None for e in events}
ret = {}
for evt_name, doc in evt_dict.items():
evt_module_name, evt_class_name = evt_name.rsplit('.', 1)
try:
evt_module = importlib.import_module(evt_module_name)
evt_class = getattr(evt_module, evt_class_name)
except Exception as e:
raise AssertionError(f'Could not load event {evt_name}: {e}') from e
ret[evt_class] = doc or evt_class.__doc__
return ret
@classmethod
def from_file(
cls, filename: str, pkg_manager: Optional[PackageManagers] = None
) -> "Manifest":
"""
Parse a manifest filename into a ``Manifest`` class.
"""
with open(str(filename), 'r') as f:
manifest = yaml.safe_load(f).get('manifest', {})
assert 'type' in manifest, f'Manifest file {filename} has no type field'
comp_type = ManifestType(manifest.pop('type'))
manifest_class = cls.by_type(comp_type)
return manifest_class(**manifest, pkg_manager=pkg_manager)
@classmethod
def by_type(cls, manifest_type: ManifestType) -> Type["Manifest"]:
"""
:return: The manifest class corresponding to the given manifest type.
"""
if manifest_type == ManifestType.PLUGIN:
return PluginManifest
if manifest_type == ManifestType.BACKEND:
return BackendManifest
raise ValueError(f'Unknown manifest type: {manifest_type}')
def __repr__(self):
"""
:return: A JSON serialized representation of the manifest.
"""
return json.dumps(
{
'description': self.description,
'install': self.install,
'events': {
'.'.join([evt_type.__module__, evt_type.__name__]): doc
for evt_type, doc in self.events.items()
},
'type': self.manifest_type.value,
'package': self.package,
'component_name': self.component_name,
}
)
class PluginManifest(Manifest):
"""
Plugin manifest.
"""
@property
def manifest_type(self) -> ManifestType:
return ManifestType.PLUGIN
# pylint: disable=too-few-public-methods
class BackendManifest(Manifest):
"""
Backend manifest.
"""
@property
def manifest_type(self) -> ManifestType:
return ManifestType.BACKEND
class Manifests:
"""
General-purpose manifests utilities.
"""
@staticmethod
def by_base_class(
base_class: Type, pkg_manager: Optional[PackageManagers] = None
) -> Generator[Manifest, None, None]:
"""
Get all the manifest files declared under the base path of a given class
and parse them into :class:`Manifest` objects.
"""
for mf in pathlib.Path(os.path.dirname(inspect.getfile(base_class))).rglob(
'manifest.yaml'
):
yield Manifest.from_file(str(mf), pkg_manager=pkg_manager)
@staticmethod
def by_config(
conf_file: Optional[str] = None,
pkg_manager: Optional[PackageManagers] = None,
) -> Generator[Manifest, None, None]:
"""
Get all the manifest objects associated to the extensions declared in a
given configuration file.
"""
from platypush.config import Config
conf_args = []
if conf_file:
conf_args.append(conf_file)
Config.init(*conf_args)
app_dir = get_src_root()
for name in Config.get_backends().keys():
yield Manifest.from_file(
os.path.join(app_dir, 'backend', *name.split('.'), 'manifest.yaml'),
pkg_manager=pkg_manager,
)
for name in Config.get_plugins().keys():
yield Manifest.from_file(
os.path.join(app_dir, 'plugins', *name.split('.'), 'manifest.yaml'),
pkg_manager=pkg_manager,
)