Compare commits

...

3 Commits

Author SHA1 Message Date
Fabio Manganiello e49a0aec4d
Various improvements.
- Better synchronization logic on stop for `AsyncRunnablePlugin`.
- Fixed several thread names by dropping `prctl.set_name` in favour of
  specifying the name directly on thread creation.
- Several LINT fixes.
2023-02-08 00:46:50 +01:00
Fabio Manganiello 9d028af524
Removed last reference of `SwitchPlugin` 2023-02-05 23:10:35 +01:00
Fabio Manganiello 419a0cec61
More LINTing
Better prototype for `MultiLevelSwitchEntityManager.set_value`
2023-02-05 23:07:43 +01:00
10 changed files with 150 additions and 199 deletions

View File

@ -6,14 +6,14 @@ from multiprocessing import Process
try:
from websockets.exceptions import ConnectionClosed
from websockets import serve as websocket_serve
from websockets import serve as websocket_serve # type: ignore
except ImportError:
from websockets import ConnectionClosed, serve as websocket_serve
from websockets import ConnectionClosed, serve as websocket_serve # type: ignore
from platypush.backend import Backend
from platypush.backend.http.app import application
from platypush.context import get_or_create_event_loop
from platypush.utils import get_ssl_server_context, set_thread_name
from platypush.utils import get_ssl_server_context
class HttpBackend(Backend):
@ -186,7 +186,7 @@ class HttpBackend(Backend):
maps=None,
run_externally=False,
uwsgi_args=None,
**kwargs
**kwargs,
):
"""
:param port: Listen port for the web server (default: 8008)
@ -283,15 +283,13 @@ class HttpBackend(Backend):
'--enable-threads',
]
self.local_base_url = '{proto}://localhost:{port}'.format(
proto=('https' if ssl_cert else 'http'), port=self.port
)
protocol = 'https' if ssl_cert else 'http'
self.local_base_url = f'{protocol}://localhost:{self.port}'
self._websocket_lock_timeout = 10
self._websocket_lock = threading.RLock()
self._websocket_locks = {}
def send_message(self, msg, **kwargs):
def send_message(self, msg, *_, **__):
self.logger.warning('Use cURL or any HTTP client to query the HTTP backend')
def on_stop(self):
@ -351,9 +349,7 @@ class HttpBackend(Backend):
timeout=self._websocket_lock_timeout
)
if not acquire_ok:
raise TimeoutError(
'Websocket on address {} not ready to receive data'.format(addr)
)
raise TimeoutError(f'Websocket on address {addr} not ready to receive data')
def _release_websocket_lock(self, ws):
try:
@ -368,7 +364,7 @@ class HttpBackend(Backend):
self._websocket_locks[addr].release()
except Exception as e:
self.logger.warning(
'Unhandled exception while releasing websocket lock: {}'.format(str(e))
'Unhandled exception while releasing websocket lock: %s', e
)
finally:
self._websocket_lock.release()
@ -381,7 +377,7 @@ class HttpBackend(Backend):
self._acquire_websocket_lock(ws)
await ws.send(str(event))
except Exception as e:
self.logger.warning('Error on websocket send_event: {}'.format(e))
self.logger.warning('Error on websocket send_event: %s', e)
finally:
self._release_websocket_lock(ws)
@ -393,7 +389,7 @@ class HttpBackend(Backend):
loop.run_until_complete(send_event(_ws))
except ConnectionClosed:
self.logger.warning(
'Websocket client {} connection lost'.format(_ws.remote_address)
'Websocket client %s connection lost', _ws.remote_address
)
self.active_websockets.remove(_ws)
if _ws.remote_address in self._websocket_locks:
@ -401,7 +397,6 @@ class HttpBackend(Backend):
def websocket(self):
"""Websocket main server"""
set_thread_name('WebsocketServer')
async def register_websocket(websocket, path):
address = (
@ -411,16 +406,14 @@ class HttpBackend(Backend):
)
self.logger.info(
'New websocket connection from {} on path {}'.format(address, path)
'New websocket connection from %s on path %s', address, path
)
self.active_websockets.add(websocket)
try:
await websocket.recv()
except ConnectionClosed:
self.logger.info(
'Websocket client {} closed connection'.format(address)
)
self.logger.info('Websocket client %s closed connection', address)
self.active_websockets.remove(websocket)
if address in self._websocket_locks:
del self._websocket_locks[address]
@ -435,14 +428,14 @@ class HttpBackend(Backend):
register_websocket,
self.bind_address,
self.websocket_port,
**websocket_args
**websocket_args,
)
)
self._websocket_loop.run_forever()
def _start_web_server(self):
def proc():
self.logger.info('Starting local web server on port {}'.format(self.port))
self.logger.info('Starting local web server on port %s', self.port)
kwargs = {
'host': self.bind_address,
'port': self.port,
@ -470,7 +463,10 @@ class HttpBackend(Backend):
if not self.disable_websocket:
self.logger.info('Initializing websocket interface')
self.websocket_thread = threading.Thread(target=self.websocket)
self.websocket_thread = threading.Thread(
target=self.websocket,
name='WebsocketServer',
)
self.websocket_thread.start()
if not self.run_externally:
@ -481,13 +477,13 @@ class HttpBackend(Backend):
self.server_proc.join()
elif self.uwsgi_args:
uwsgi_cmd = ['uwsgi'] + self.uwsgi_args
self.logger.info('Starting uWSGI with arguments {}'.format(uwsgi_cmd))
self.logger.info('Starting uWSGI with arguments %s', uwsgi_cmd)
self.server_proc = subprocess.Popen(uwsgi_cmd)
else:
self.logger.info(
'The web server is configured to be launched externally but '
+ 'no uwsgi_args were provided. Make sure that you run another external service'
+ 'for the webserver (e.g. nginx)'
'no uwsgi_args were provided. Make sure that you run another external service'
'for the web server (e.g. nginx)'
)
self._service_registry_thread = threading.Thread(target=self._register_service)

View File

@ -13,7 +13,6 @@ from platypush.message import Message
from platypush.message.event.mqtt import MQTTMessageEvent
from platypush.message.request import Request
from platypush.plugins.mqtt import MqttPlugin as MQTTPlugin
from platypush.utils import set_thread_name
class MqttClient(mqtt.Client, threading.Thread):
@ -39,6 +38,7 @@ class MqttClient(mqtt.Client, threading.Thread):
mqtt.Client.__init__(self, *args, client_id=client_id, **kwargs)
threading.Thread.__init__(self)
self.name = f'MQTTClient:{client_id}'
self.host = host
self.port = port
self.topics = set(topics or [])
@ -393,7 +393,6 @@ class MqttBackend(Backend):
def handler(_, __, msg):
# noinspection PyShadowingNames
def response_thread(msg):
set_thread_name('MQTTProcessor')
response = self.get_message_response(msg)
if not response:
return
@ -428,7 +427,9 @@ class MqttBackend(Backend):
if isinstance(msg, Request):
threading.Thread(
target=response_thread, name='MQTTProcessor', args=(msg,)
target=response_thread,
name='MQTTProcessorResponseThread',
args=(msg,),
).start()
return handler

View File

@ -35,7 +35,7 @@ class MultiLevelSwitchEntityManager(EntityManager, ABC):
@abstractmethod
def set_value( # pylint: disable=redefined-builtin
self, *entities, property=None, data=None, **__
self, *_, property=None, data=None, **__
):
"""Set a value"""
raise NotImplementedError()

View File

@ -1,7 +1,6 @@
import asyncio
import logging
import threading
import time
from abc import ABC, abstractmethod
from functools import wraps
@ -11,7 +10,7 @@ from platypush.bus import Bus
from platypush.common import ExtensionWithManifest
from platypush.event import EventGenerator
from platypush.message.response import Response
from platypush.utils import get_decorators, get_plugin_name_by_class, set_thread_name
from platypush.utils import get_decorators, get_plugin_name_by_class
PLUGIN_STOP_TIMEOUT = 5 # Plugin stop timeout in seconds
@ -107,25 +106,22 @@ class RunnablePlugin(Plugin):
return self._should_stop.wait(timeout=timeout)
def start(self):
set_thread_name(self.__class__.__name__)
self._thread = threading.Thread(target=self._runner)
self._thread = threading.Thread(
target=self._runner, name=self.__class__.__name__
)
self._thread.start()
def stop(self):
self._should_stop.set()
if self._thread and self._thread.is_alive():
self.logger.info('Waiting for %s to stop', self.__class__.__name__)
self.logger.info('Waiting for the plugin to stop')
try:
if self._thread:
self._thread.join(timeout=self._stop_timeout)
if self._thread and self._thread.is_alive():
self.logger.warning(
'Timeout (seconds={%s}) on exit for the plugin %s',
'Timeout (seconds=%s) on exit for the plugin',
self._stop_timeout,
(
get_plugin_name_by_class(self.__class__)
or self.__class__.__name__
),
)
except Exception as e:
self.logger.warning('Could not join thread on stop: %s', e)
@ -142,7 +138,7 @@ class RunnablePlugin(Plugin):
self.logger.exception(e)
if self.poll_interval:
time.sleep(self.poll_interval)
self.wait_stop(self.poll_interval)
self._thread = None
@ -156,18 +152,27 @@ class AsyncRunnablePlugin(RunnablePlugin, ABC):
super().__init__(*args, **kwargs)
self._loop: Optional[asyncio.AbstractEventLoop] = None
self._loop_runner: Optional[threading.Thread] = None
self._task: Optional[asyncio.Task] = None
@property
def _should_start_runner(self):
"""
This property is used to determine if the runner and the event loop
should be started for this plugin.
"""
return True
@abstractmethod
async def listen(self):
pass
"""
Main body of the async plugin. When it's called, the event loop should
already be running and available over `self._loop`.
"""
async def _listen(self):
"""
Wrapper for :meth:`.listen` that catches any exceptions and logs them.
"""
try:
await self.listen()
except KeyboardInterrupt:
@ -179,41 +184,37 @@ class AsyncRunnablePlugin(RunnablePlugin, ABC):
):
raise e
def _start_listener(self):
set_thread_name(self.__class__.__name__ + ':listener')
def _run_listener(self):
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
self._task = self._loop.create_task(self._listen())
if hasattr(self._task, 'set_name'):
self._task.set_name(self.__class__.__name__ + '.listen')
self._loop.run_forever()
try:
self._loop.run_until_complete(self._task)
except Exception as e:
self.logger.info('The loop has terminated with an error: %s', e)
self._task.cancel()
def main(self):
if self.should_stop() or (self._loop_runner and self._loop_runner.is_alive()):
self.logger.info('The main loop is already being run/stopped')
if self.should_stop():
self.logger.info('The plugin is already scheduled to stop')
return
self._loop = asyncio.new_event_loop()
if self._should_start_runner:
self._loop_runner = threading.Thread(target=self._start_listener)
self._loop_runner.start()
self._run_listener()
self.wait_stop()
def stop(self):
if self._task and self._loop and not self._task.done():
self._loop.call_soon_threadsafe(self._task.cancel)
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._loop.stop)
self._loop = None
if self._loop_runner and self._loop_runner.is_alive():
try:
self._loop_runner.join(timeout=self._stop_timeout)
finally:
self._loop_runner = None
super().stop()

View File

@ -1,7 +1,7 @@
import asyncio
import json
import os
from typing import Optional, Collection, Mapping
from typing import Any, Callable, Dict, Optional, Collection, Mapping
import requests
import websockets
@ -53,12 +53,12 @@ class NtfyPlugin(AsyncRunnablePlugin):
reconnect_wait_secs_max = 60
while not self.should_stop():
self.logger.debug(f'Connecting to {url}')
self.logger.debug('Connecting to %s', url)
try:
async with websockets.connect(url) as ws: # type: ignore
async with websockets.connect(url) as ws: # pylint: disable=no-member
reconnect_wait_secs = 1
self.logger.info(f'Connected to {url}')
self.logger.info('Connected to %s', url)
async for msg in ws:
try:
msg = json.loads(msg)
@ -122,7 +122,7 @@ class NtfyPlugin(AsyncRunnablePlugin):
tags: Optional[Collection[str]] = None,
schedule: Optional[str] = None,
):
"""
r"""
Send a message/notification to a topic.
:param topic: Topic where the message will be delivered.
@ -148,36 +148,36 @@ class NtfyPlugin(AsyncRunnablePlugin):
- ``broadcast``: Send an `Android broadcast <https://developer.android.com/guide/components/broadcasts>`
intent upon action selection (only available on Android).
Example:
Actions example:
.. code-block:: json
.. code-block:: json
[
{
"action": "view",
"label": "Open portal",
"url": "https://home.nest.com/",
"clear": true
[
{
"action": "view",
"label": "Open portal",
"url": "https://home.nest.com/",
"clear": true
},
{
"action": "http",
"label": "Turn down",
"url": "https://api.nest.com/",
"method": "PUT",
"headers": {
"Authorization": "Bearer abcdef..."
},
{
"action": "http",
"label": "Turn down",
"url": "https://api.nest.com/",
"method": "PUT",
"headers": {
"Authorization": "Bearer abcdef..."
},
"body": "{\\"temperature\\": 65}"
},
{
"action": "broadcast",
"label": "Take picture",
"intent": "com.myapp.TAKE_PICTURE_INTENT",
"extras": {
"camera": "front"
}
"body": "{\\"temperature\\": 65}"
},
{
"action": "broadcast",
"label": "Take picture",
"intent": "com.myapp.TAKE_PICTURE_INTENT",
"extras": {
"camera": "front"
}
]
}
]
:param email: Forward the notification as an email to the specified
address.
@ -196,9 +196,9 @@ class NtfyPlugin(AsyncRunnablePlugin):
``tomorrow, 3pm``)
"""
method = requests.post
method: Callable[..., requests.Response] = requests.post
url = server_url or self._server_url
args = {}
args: Dict[str, Any] = {}
if username and password:
args['auth'] = (username, password)
@ -247,8 +247,8 @@ class NtfyPlugin(AsyncRunnablePlugin):
}
rs = method(url, **args)
assert rs.ok, 'Could not send message to {}: {}'.format(
topic, rs.json().get('error', f'HTTP error: {rs.status_code}')
assert rs.ok, f'Could not send message to {topic}: ' + rs.json().get(
'error', f'HTTP error: {rs.status_code}'
)
return rs.json()

View File

@ -859,8 +859,9 @@ class SmartthingsPlugin(
return self.status(device)
@action
def set_value( # pylint: disable=arguments-differ,redefined-builtin
self, device: str, property: Optional[str] = None, data=None, **kwargs
# pylint: disable=redefined-builtin,arguments-differ
def set_value(
self, device: str, *_, property: Optional[str] = None, data=None, **kwargs
):
"""
Set the value of a device. It is compatible with the generic

View File

@ -1,72 +0,0 @@
from abc import ABC, abstractmethod
from typing import List, Union
from platypush.plugins import Plugin, action
# TODO REMOVE ME
class SwitchPlugin(Plugin, ABC):
"""
Abstract class for interacting with switch devices
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
@action
@abstractmethod
def on(self, device, *args, **kwargs):
"""Turn the device on"""
raise NotImplementedError()
@action
@abstractmethod
def off(self, device, *args, **kwargs):
"""Turn the device off"""
raise NotImplementedError()
@action
@abstractmethod
def toggle(self, device, *args, **kwargs):
"""Toggle the device status (on/off)"""
raise NotImplementedError()
@action
def switch_status(self, device=None) -> Union[dict, List[dict]]:
"""
Get the status of a specified device or of all the configured devices (default).
:param device: Filter by device name or ID.
:return: .. schema:: switch.SwitchStatusSchema(many=True)
"""
devices = self.switches
if device:
devices = [
d
for d in self.switches
if d.get('id') == device or d.get('name') == device
]
return devices[0] if devices else []
return devices
@action
def status(self, device=None, *_, **__) -> Union[dict, List[dict]]:
"""
Get the status of all the devices, or filter by device name or ID (alias for :meth:`.switch_status`).
:param device: Filter by device name or ID.
:return: .. schema:: switch.SwitchStatusSchema(many=True)
"""
return self.switch_status(device)
@property
@abstractmethod
def switches(self) -> List[dict]:
"""
:return: .. schema:: switch.SwitchStatusSchema(many=True)
"""
raise NotImplementedError()
# vim:sw=4:ts=4:et:

View File

@ -1105,7 +1105,7 @@ class SwitchbotPlugin(
@action
# pylint: disable=redefined-builtin,arguments-differ
def set_value(self, device: str, property=None, data=None, **__):
def set_value(self, device: str, *_, property=None, data=None, **__):
entity = self._to_entity(device, property)
assert entity, f'No such entity: "{device}"'

View File

@ -1,16 +1,16 @@
import enum
import time
from typing import List, Optional
from typing import Any, Collection, Dict, List, Optional
from platypush.entities import EnumSwitchEntityManager
from platypush.entities.switches import EnumSwitch
from platypush.message.response.bluetooth import BluetoothScanResponse
from platypush.plugins import action
from platypush.plugins.bluetooth.ble import BluetoothBlePlugin
from platypush.plugins.switch import SwitchPlugin
class SwitchbotBluetoothPlugin( # lgtm [py/missing-call-to-init]
SwitchPlugin, BluetoothBlePlugin
):
# pylint: disable=too-many-ancestors
class SwitchbotBluetoothPlugin(BluetoothBlePlugin, EnumSwitchEntityManager):
"""
Plugin to interact with a Switchbot (https://www.switch-bot.com/) device and
programmatically control switches over a Bluetooth interface.
@ -91,7 +91,7 @@ class SwitchbotBluetoothPlugin( # lgtm [py/missing-call-to-init]
raise e
time.sleep(5)
return self.status(device) # type: ignore
return self.status(device)
@action
def press(self, device):
@ -127,6 +127,30 @@ class SwitchbotBluetoothPlugin( # lgtm [py/missing-call-to-init]
"""
return self._run(device, self.Command.OFF)
@action
# pylint: disable=arguments-differ
def set_value(self, device: str, data: str, *_, **__):
"""
Entity-compatible ``set_value`` method to send a command to a device.
:param device: Device name or address
:param data: Command to send. Possible values are:
- ``on``: Press the button and remain in the pressed state.
- ``off``: Release a previously pressed button.
- ``press``: Press and release the button.
"""
if data == 'on':
return self.on(device)
if data == 'off':
return self.off(device)
if data == 'press':
return self.press(device)
self.logger.warning('Unknown command for SwitchBot "%s": "%s"', device, data)
return None
@action
def scan(
self, interface: Optional[str] = None, duration: int = 10
@ -138,14 +162,16 @@ class SwitchbotBluetoothPlugin( # lgtm [py/missing-call-to-init]
:param duration: Scan duration in seconds
"""
devices = super().scan(interface=interface, duration=duration).devices
compatible_devices = {}
compatible_devices: Dict[str, Any] = {}
devices = (
super().scan(interface=interface, duration=duration).devices # type: ignore
)
for dev in devices:
try:
characteristics = [
chrc
for chrc in self.discover_characteristics(
for chrc in self.discover_characteristics( # type: ignore
dev['addr'],
channel_type='random',
wait=False,
@ -157,14 +183,14 @@ class SwitchbotBluetoothPlugin( # lgtm [py/missing-call-to-init]
if characteristics:
compatible_devices[dev['addr']] = None
except Exception as e:
self.logger.warning('Device scan error', e)
self.logger.warning('Device scan error: %s', e)
self.publish_entities(compatible_devices) # type: ignore
self.publish_entities(compatible_devices)
return BluetoothScanResponse(devices=compatible_devices)
@property
def switches(self) -> List[dict]:
self.publish_entities(self.configured_devices) # type: ignore
@action
def status(self, *_, **__) -> List[dict]:
self.publish_entities(self.configured_devices)
return [
{
'address': addr,
@ -175,20 +201,17 @@ class SwitchbotBluetoothPlugin( # lgtm [py/missing-call-to-init]
for addr, name in self.configured_devices.items()
]
def transform_entities(self, devices: dict):
from platypush.entities.switches import Switch
return super().transform_entities( # type: ignore
[
Switch(
id=addr,
name=name,
state=False,
is_write_only=True,
)
for addr, name in devices.items()
]
)
def transform_entities(self, entities: Collection[dict]) -> Collection[EnumSwitch]:
return [
EnumSwitch(
id=addr,
name=name,
value='on',
values=['on', 'off', 'press'],
is_write_only=True,
)
for addr, name in entities
]
# vim:sw=4:ts=4:et:

View File

@ -1072,8 +1072,9 @@ class ZigbeeMqttPlugin(
return properties
@action
def set_value( # pylint: disable=redefined-builtin,arguments-differ
self, device: str, property: Optional[str] = None, data=None, **kwargs
# pylint: disable=redefined-builtin,arguments-differ
def set_value(
self, device: str, *_, property: Optional[str] = None, data=None, **kwargs
):
"""
Entity-compatible way of setting a value on a node.