diff --git a/platypush/backend/mqtt/__init__.py b/platypush/backend/mqtt/__init__.py index 3ca89d32..f03ecb95 100644 --- a/platypush/backend/mqtt/__init__.py +++ b/platypush/backend/mqtt/__init__.py @@ -2,7 +2,7 @@ import hashlib import json import os import threading -from typing import Optional, List, Callable +from typing import Any, Dict, Optional, List, Callable import paho.mqtt.client as mqtt @@ -16,6 +16,10 @@ from platypush.plugins.mqtt import MqttPlugin as MQTTPlugin class MqttClient(mqtt.Client, threading.Thread): + """ + Wrapper class for an MQTT client executed in a separate thread. + """ + def __init__( self, *args, @@ -78,7 +82,7 @@ class MqttClient(mqtt.Client, threading.Thread): def unsubscribe(self, *topics, **kwargs): """ - Client unsubscription handler. + Client unsubscribe handler. """ if not topics: topics = self.topics @@ -127,9 +131,10 @@ class MqttBackend(Backend): def __init__( self, + *args, host: Optional[str] = None, port: int = _default_mqtt_port, - topic='platypush_bus_mq', + topic: str = 'platypush_bus_mq', subscribe_default_topic: bool = True, tls_cafile: Optional[str] = None, tls_certfile: Optional[str] = None, @@ -141,7 +146,6 @@ class MqttBackend(Backend): password: Optional[str] = None, client_id: Optional[str] = None, listeners=None, - *args, **kwargs, ): """ @@ -202,7 +206,7 @@ class MqttBackend(Backend): self.tls_insecure = tls_insecure self.username = username self.password = password - self.client_id: str = client_id or Config.get('device_id') # type: ignore + self.client_id: str = client_id or Config.get('device_id') else: client = get_plugin('mqtt') assert ( @@ -219,14 +223,14 @@ class MqttBackend(Backend): self.tls_insecure = client.tls_insecure self.username = client.username self.password = client.password - self.client_id: str = client_id or client.client_id # type: ignore + self.client_id = client_id or client.client_id - self.topic = '{}/{}'.format(topic, self.device_id) + self.topic = f'{topic}/{self.device_id}' self.subscribe_default_topic = subscribe_default_topic - self._listeners = {} # client_id -> MqttClient map + self._listeners: Dict[str, MqttClient] = {} # client_id -> MqttClient map self.listeners_conf = listeners or [] - def send_message(self, msg, topic: Optional[str] = None, **kwargs): + def send_message(self, msg, *_, topic: Optional[str] = None, **kwargs): try: client = get_plugin('mqtt') client.send_message( @@ -252,9 +256,8 @@ class MqttBackend(Backend): return os.path.abspath(os.path.expanduser(path)) if path else path def add_listeners(self, *listeners): - # noinspection PyShadowingNames,PyUnusedLocal for i, listener in enumerate(listeners): - host = listener.get('host') + host = listener.get('host', self.host) if host: port = listener.get('port', self._default_mqtt_port) username = listener.get('username') @@ -280,7 +283,7 @@ class MqttBackend(Backend): topics = listener.get('topics') if not topics: self.logger.warning( - 'No list of topics specified for listener n.{}'.format(i + 1) + 'No list of topics specified for listener n.%d', i + 1 ) continue @@ -308,21 +311,21 @@ class MqttBackend(Backend): port: int, topics: Optional[List[str]] = None, client_id: Optional[str] = None, - on_message: Optional[bool] = None, + on_message: Optional[Callable[[MqttClient, Any, mqtt.MQTTMessage], Any]] = None, ) -> str: - return '{client_id}-{client_hash}'.format( - client_id=client_id or self.client_id, - client_hash=hashlib.sha1( - '|'.join( - [ - host, - str(port), - json.dumps(sorted(topics or [])), - str(id(on_message)), - ] - ).encode() - ).hexdigest(), - ) + client_id = client_id or self.client_id + client_hash = hashlib.sha1( + '|'.join( + [ + host, + str(port), + json.dumps(sorted(topics or [])), + str(id(on_message)), + ] + ).encode() + ).hexdigest() + + return f'{client_id}-{client_hash}' def _get_client( self, @@ -367,47 +370,45 @@ class MqttBackend(Backend): on_message=on_message, ) - client.subscribe(*topics) + if topics: + client.subscribe(*topics) + return client def on_mqtt_message(self): - def handler(client, __, msg): + def handler(client: MqttClient, _, msg: mqtt.MQTTMessage): data = msg.payload - # noinspection PyBroadException try: data = data.decode('utf-8') data = json.loads(data) except Exception as e: self.logger.debug(str(e)) - # noinspection PyProtectedMember self.bus.post( MQTTMessageEvent( - host=client._host, port=client._port, topic=msg.topic, msg=data + host=client.host, port=client.port, topic=msg.topic, msg=data ) ) return handler def on_exec_message(self): - def handler(_, __, msg): - # noinspection PyShadowingNames + def handler(_, __, msg: mqtt.MQTTMessage): def response_thread(msg): response = self.get_message_response(msg) if not response: return - response_topic = '{}/responses/{}'.format(self.topic, msg.id) + response_topic = f'{self.topic}/responses/{msg.id}' self.logger.info( - 'Processing response on the MQTT topic {}: {}'.format( - response_topic, response - ) + 'Processing response on the MQTT topic %s: %s', + response_topic, + response, ) self.send_message(response, topic=response_topic) msg = msg.payload.decode('utf-8') - # noinspection PyBroadException try: msg = json.loads(msg) msg = Message.build(msg) @@ -417,7 +418,7 @@ class MqttBackend(Backend): if not msg: return - self.logger.info('Received message on the MQTT backend: {}'.format(msg)) + self.logger.info('Received message on the MQTT backend: %s', msg) try: self.on_message(msg) @@ -457,9 +458,10 @@ class MqttBackend(Backend): client.start() self.logger.info( - 'Initialized MQTT backend on host {}:{}, topic {}'.format( - self.host, self.port, self.topic - ) + 'Initialized MQTT backend on host %s:%d, topic=%s', + self.host, + self.port, + self.topic, ) self.add_listeners(*self.listeners_conf) @@ -471,7 +473,7 @@ class MqttBackend(Backend): try: listener.stop() except Exception as e: - self.logger.warning(f'Could not stop MQTT listener: {e}') + self.logger.warning('Could not stop MQTT listener: %s', e) self.logger.info('MQTT backend terminated')