forked from platypush/platypush
More LINT fixes
This commit is contained in:
parent
a98a5f0980
commit
e04870209e
1 changed files with 45 additions and 43 deletions
|
@ -2,7 +2,7 @@ import hashlib
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import threading
|
import threading
|
||||||
from typing import Optional, List, Callable
|
from typing import Any, Dict, Optional, List, Callable
|
||||||
|
|
||||||
import paho.mqtt.client as mqtt
|
import paho.mqtt.client as mqtt
|
||||||
|
|
||||||
|
@ -16,6 +16,10 @@ from platypush.plugins.mqtt import MqttPlugin as MQTTPlugin
|
||||||
|
|
||||||
|
|
||||||
class MqttClient(mqtt.Client, threading.Thread):
|
class MqttClient(mqtt.Client, threading.Thread):
|
||||||
|
"""
|
||||||
|
Wrapper class for an MQTT client executed in a separate thread.
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
*args,
|
*args,
|
||||||
|
@ -78,7 +82,7 @@ class MqttClient(mqtt.Client, threading.Thread):
|
||||||
|
|
||||||
def unsubscribe(self, *topics, **kwargs):
|
def unsubscribe(self, *topics, **kwargs):
|
||||||
"""
|
"""
|
||||||
Client unsubscription handler.
|
Client unsubscribe handler.
|
||||||
"""
|
"""
|
||||||
if not topics:
|
if not topics:
|
||||||
topics = self.topics
|
topics = self.topics
|
||||||
|
@ -127,9 +131,10 @@ class MqttBackend(Backend):
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
*args,
|
||||||
host: Optional[str] = None,
|
host: Optional[str] = None,
|
||||||
port: int = _default_mqtt_port,
|
port: int = _default_mqtt_port,
|
||||||
topic='platypush_bus_mq',
|
topic: str = 'platypush_bus_mq',
|
||||||
subscribe_default_topic: bool = True,
|
subscribe_default_topic: bool = True,
|
||||||
tls_cafile: Optional[str] = None,
|
tls_cafile: Optional[str] = None,
|
||||||
tls_certfile: Optional[str] = None,
|
tls_certfile: Optional[str] = None,
|
||||||
|
@ -141,7 +146,6 @@ class MqttBackend(Backend):
|
||||||
password: Optional[str] = None,
|
password: Optional[str] = None,
|
||||||
client_id: Optional[str] = None,
|
client_id: Optional[str] = None,
|
||||||
listeners=None,
|
listeners=None,
|
||||||
*args,
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
|
@ -202,7 +206,7 @@ class MqttBackend(Backend):
|
||||||
self.tls_insecure = tls_insecure
|
self.tls_insecure = tls_insecure
|
||||||
self.username = username
|
self.username = username
|
||||||
self.password = password
|
self.password = password
|
||||||
self.client_id: str = client_id or Config.get('device_id') # type: ignore
|
self.client_id: str = client_id or Config.get('device_id')
|
||||||
else:
|
else:
|
||||||
client = get_plugin('mqtt')
|
client = get_plugin('mqtt')
|
||||||
assert (
|
assert (
|
||||||
|
@ -219,14 +223,14 @@ class MqttBackend(Backend):
|
||||||
self.tls_insecure = client.tls_insecure
|
self.tls_insecure = client.tls_insecure
|
||||||
self.username = client.username
|
self.username = client.username
|
||||||
self.password = client.password
|
self.password = client.password
|
||||||
self.client_id: str = client_id or client.client_id # type: ignore
|
self.client_id = client_id or client.client_id
|
||||||
|
|
||||||
self.topic = '{}/{}'.format(topic, self.device_id)
|
self.topic = f'{topic}/{self.device_id}'
|
||||||
self.subscribe_default_topic = subscribe_default_topic
|
self.subscribe_default_topic = subscribe_default_topic
|
||||||
self._listeners = {} # client_id -> MqttClient map
|
self._listeners: Dict[str, MqttClient] = {} # client_id -> MqttClient map
|
||||||
self.listeners_conf = listeners or []
|
self.listeners_conf = listeners or []
|
||||||
|
|
||||||
def send_message(self, msg, topic: Optional[str] = None, **kwargs):
|
def send_message(self, msg, *_, topic: Optional[str] = None, **kwargs):
|
||||||
try:
|
try:
|
||||||
client = get_plugin('mqtt')
|
client = get_plugin('mqtt')
|
||||||
client.send_message(
|
client.send_message(
|
||||||
|
@ -252,9 +256,8 @@ class MqttBackend(Backend):
|
||||||
return os.path.abspath(os.path.expanduser(path)) if path else path
|
return os.path.abspath(os.path.expanduser(path)) if path else path
|
||||||
|
|
||||||
def add_listeners(self, *listeners):
|
def add_listeners(self, *listeners):
|
||||||
# noinspection PyShadowingNames,PyUnusedLocal
|
|
||||||
for i, listener in enumerate(listeners):
|
for i, listener in enumerate(listeners):
|
||||||
host = listener.get('host')
|
host = listener.get('host', self.host)
|
||||||
if host:
|
if host:
|
||||||
port = listener.get('port', self._default_mqtt_port)
|
port = listener.get('port', self._default_mqtt_port)
|
||||||
username = listener.get('username')
|
username = listener.get('username')
|
||||||
|
@ -280,7 +283,7 @@ class MqttBackend(Backend):
|
||||||
topics = listener.get('topics')
|
topics = listener.get('topics')
|
||||||
if not topics:
|
if not topics:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'No list of topics specified for listener n.{}'.format(i + 1)
|
'No list of topics specified for listener n.%d', i + 1
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -308,10 +311,9 @@ class MqttBackend(Backend):
|
||||||
port: int,
|
port: int,
|
||||||
topics: Optional[List[str]] = None,
|
topics: Optional[List[str]] = None,
|
||||||
client_id: Optional[str] = None,
|
client_id: Optional[str] = None,
|
||||||
on_message: Optional[bool] = None,
|
on_message: Optional[Callable[[MqttClient, Any, mqtt.MQTTMessage], Any]] = None,
|
||||||
) -> str:
|
) -> str:
|
||||||
return '{client_id}-{client_hash}'.format(
|
client_id = client_id or self.client_id
|
||||||
client_id=client_id or self.client_id,
|
|
||||||
client_hash = hashlib.sha1(
|
client_hash = hashlib.sha1(
|
||||||
'|'.join(
|
'|'.join(
|
||||||
[
|
[
|
||||||
|
@ -321,8 +323,9 @@ class MqttBackend(Backend):
|
||||||
str(id(on_message)),
|
str(id(on_message)),
|
||||||
]
|
]
|
||||||
).encode()
|
).encode()
|
||||||
).hexdigest(),
|
).hexdigest()
|
||||||
)
|
|
||||||
|
return f'{client_id}-{client_hash}'
|
||||||
|
|
||||||
def _get_client(
|
def _get_client(
|
||||||
self,
|
self,
|
||||||
|
@ -367,47 +370,45 @@ class MqttBackend(Backend):
|
||||||
on_message=on_message,
|
on_message=on_message,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if topics:
|
||||||
client.subscribe(*topics)
|
client.subscribe(*topics)
|
||||||
|
|
||||||
return client
|
return client
|
||||||
|
|
||||||
def on_mqtt_message(self):
|
def on_mqtt_message(self):
|
||||||
def handler(client, __, msg):
|
def handler(client: MqttClient, _, msg: mqtt.MQTTMessage):
|
||||||
data = msg.payload
|
data = msg.payload
|
||||||
# noinspection PyBroadException
|
|
||||||
try:
|
try:
|
||||||
data = data.decode('utf-8')
|
data = data.decode('utf-8')
|
||||||
data = json.loads(data)
|
data = json.loads(data)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.debug(str(e))
|
self.logger.debug(str(e))
|
||||||
|
|
||||||
# noinspection PyProtectedMember
|
|
||||||
self.bus.post(
|
self.bus.post(
|
||||||
MQTTMessageEvent(
|
MQTTMessageEvent(
|
||||||
host=client._host, port=client._port, topic=msg.topic, msg=data
|
host=client.host, port=client.port, topic=msg.topic, msg=data
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
return handler
|
return handler
|
||||||
|
|
||||||
def on_exec_message(self):
|
def on_exec_message(self):
|
||||||
def handler(_, __, msg):
|
def handler(_, __, msg: mqtt.MQTTMessage):
|
||||||
# noinspection PyShadowingNames
|
|
||||||
def response_thread(msg):
|
def response_thread(msg):
|
||||||
response = self.get_message_response(msg)
|
response = self.get_message_response(msg)
|
||||||
if not response:
|
if not response:
|
||||||
return
|
return
|
||||||
response_topic = '{}/responses/{}'.format(self.topic, msg.id)
|
response_topic = f'{self.topic}/responses/{msg.id}'
|
||||||
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
'Processing response on the MQTT topic {}: {}'.format(
|
'Processing response on the MQTT topic %s: %s',
|
||||||
response_topic, response
|
response_topic,
|
||||||
)
|
response,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.send_message(response, topic=response_topic)
|
self.send_message(response, topic=response_topic)
|
||||||
|
|
||||||
msg = msg.payload.decode('utf-8')
|
msg = msg.payload.decode('utf-8')
|
||||||
# noinspection PyBroadException
|
|
||||||
try:
|
try:
|
||||||
msg = json.loads(msg)
|
msg = json.loads(msg)
|
||||||
msg = Message.build(msg)
|
msg = Message.build(msg)
|
||||||
|
@ -417,7 +418,7 @@ class MqttBackend(Backend):
|
||||||
if not msg:
|
if not msg:
|
||||||
return
|
return
|
||||||
|
|
||||||
self.logger.info('Received message on the MQTT backend: {}'.format(msg))
|
self.logger.info('Received message on the MQTT backend: %s', msg)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.on_message(msg)
|
self.on_message(msg)
|
||||||
|
@ -457,9 +458,10 @@ class MqttBackend(Backend):
|
||||||
|
|
||||||
client.start()
|
client.start()
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
'Initialized MQTT backend on host {}:{}, topic {}'.format(
|
'Initialized MQTT backend on host %s:%d, topic=%s',
|
||||||
self.host, self.port, self.topic
|
self.host,
|
||||||
)
|
self.port,
|
||||||
|
self.topic,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.add_listeners(*self.listeners_conf)
|
self.add_listeners(*self.listeners_conf)
|
||||||
|
@ -471,7 +473,7 @@ class MqttBackend(Backend):
|
||||||
try:
|
try:
|
||||||
listener.stop()
|
listener.stop()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.warning(f'Could not stop MQTT listener: {e}')
|
self.logger.warning('Could not stop MQTT listener: %s', e)
|
||||||
|
|
||||||
self.logger.info('MQTT backend terminated')
|
self.logger.info('MQTT backend terminated')
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue