platypush/platypush/plugins/linode/__init__.py

222 lines
7.2 KiB
Python

from typing import Collection, Dict, List, Optional
from typing_extensions import override
from linode_api4 import LinodeClient, Instance, objects
from platypush.context import get_bus
from platypush.entities.cloud import CloudInstance
from platypush.message.event.linode import LinodeInstanceStatusChanged
from platypush.schemas.linode import (
LinodeInstance,
LinodeInstanceSchema,
LinodeInstanceStatus,
)
from platypush.entities.managers.cloud import CloudInstanceEntityManager, InstanceId
from platypush.plugins import RunnablePlugin, action
class LinodePlugin(RunnablePlugin, CloudInstanceEntityManager):
"""
This plugin can interact with a Linode account and manage node and volumes.
To get your token:
- Login to <https://cloud.linode.com/>.
- Go to My Profile -> API Tokens -> Add a Personal Access Token.
- Select the scopes that you want to provide to your new token.
Requires:
* **linode_api4** (``pip install linode_api4``)
Triggers:
* :class:`platypush.message.event.linode.LinodeInstanceStatusChanged` when the status of an instance changes.
"""
def __init__(self, token: str, poll_interval: float = 10.0, **kwargs):
"""
:param token: Linode API token.
:param poll_interval: How often to poll the Linode API
(default: 60 seconds).
"""
super().__init__(poll_interval=poll_interval, **kwargs)
self._token = token
self._instances: Dict[int, CloudInstance] = {}
""" ``{instance_id: CloudInstance}`` mapping. """
def _get_client(self, token: Optional[str] = None) -> LinodeClient:
"""
Get a :class:`LinodeClient` instance.
:param token: Override the default token.
"""
return LinodeClient(token or self._token)
def _get_instance(
self, instance: InstanceId, token: Optional[str] = None
) -> Instance:
"""
Get an instance by name or ID.
:param instance: The label, ID or host UUID of the instance.
:param token: Override the default token.
"""
client = self._get_client(token)
if isinstance(instance, str):
filters = Instance.label == instance
elif isinstance(instance, int):
filters = Instance.id == instance
else:
raise AssertionError(f'Invalid instance type: {type(instance)}')
instances = client.linode.instances(*filters)
assert instances, f'No such Linode instance: {instance}'
return instances[0]
def _linode_instance_to_dict(self, instance: Instance) -> dict:
"""
Convert an internal :class:`linode_api4.Instance` to a
dictionary representation that can be used to create a
:class:`platypush.entities.cloud.CloudInstance` object.
"""
return {
key: (value.dict if isinstance(value, objects.MappedObject) else value)
for key, value in instance.__dict__.items()
if not key.startswith('_')
}
@override
def main(self):
while not self.should_stop():
status = self._instances.copy()
new_status = self.status(publish_entities=False).output
changed_instances = (
[
instance
for instance in new_status
if not (
status.get(instance.id)
and status[instance.id].status == instance.status
)
]
if new_status
else []
)
if changed_instances:
for instance in changed_instances:
get_bus().post(
LinodeInstanceStatusChanged(
instance_id=instance.id,
instance=instance.name,
status=instance.status,
old_status=(
status[instance.id].status
if status.get(instance.id)
else None
),
)
)
self.publish_entities(changed_instances)
self._instances = new_status
self.wait_stop(self.poll_interval)
@override
def transform_entities(
self, entities: Collection[LinodeInstance]
) -> Collection[CloudInstance]:
schema = LinodeInstanceSchema()
return super().transform_entities(
[
CloudInstance(
reachable=instance.status == LinodeInstanceStatus.RUNNING,
**schema.dump(instance),
)
for instance in entities
]
)
@action
@override
def status(
self,
*_,
instance: Optional[InstanceId] = None,
token: Optional[str] = None,
publish_entities: bool = True,
**__,
) -> List[LinodeInstance]:
"""
Get the full status and info of the instances associated to a selected account.
:param token: Override the default access token if you want to query another account.
:param instance: Select only one instance, either by name, ID or host UUID.
:param publish_entities: Whether
:class:`platypush.message.event.entities.EntityUpdateEvent` should
be published for all the instances, whether or not their status has
changed (default: ``True``).
:return: .. schema:: linode.LinodeInstanceSchema(many=True)
"""
instances = (
[self._get_instance(instance=instance)]
if instance
else [
instance
for page in self._get_client(token).linode.instances().lists
for instance in page
]
)
mapped_instances = LinodeInstanceSchema(many=True).load(
map(self._linode_instance_to_dict, instances)
)
if publish_entities:
self.publish_entities(mapped_instances)
return mapped_instances
@override
@action
def reboot(self, instance: InstanceId, token: Optional[str] = None, **_):
"""
Reboot an instance.
:param instance: Instance ID, label or host UUID.
:param token: Default access token override.
"""
node = self._get_instance(instance=instance, token=token)
assert node.reboot(), 'Reboot failed'
@override
@action
def boot(self, instance: InstanceId, token: Optional[str] = None, **_):
"""
Boot an instance.
:param instance: Instance ID, label or host UUID.
:param token: Default access token override.
"""
node = self._get_instance(instance=instance, token=token)
assert node.boot(), 'Boot failed'
@override
@action
def shutdown(self, instance: InstanceId, token: Optional[str] = None, **_):
"""
Shutdown an instance.
:param instance: Instance ID, label or host UUID.
:param token: Default access token override.
"""
node = self._get_instance(instance=instance, token=token)
assert node.shutdown(), 'Shutdown failed'
# vim:sw=4:ts=4:et: