forked from platypush/platypush
Added logic to prevent socket leakage from paho-mqtt upon client stop.
This commit is contained in:
parent
3f4168eb69
commit
07c88c9530
2 changed files with 23 additions and 12 deletions
|
@ -178,7 +178,6 @@ class MqttPlugin(RunnablePlugin):
|
||||||
host: str,
|
host: str,
|
||||||
port: int,
|
port: int,
|
||||||
client_id: Optional[str] = None,
|
client_id: Optional[str] = None,
|
||||||
on_message: Optional[MqttCallback] = None,
|
|
||||||
topics: Iterable[str] = (),
|
topics: Iterable[str] = (),
|
||||||
**_,
|
**_,
|
||||||
) -> str:
|
) -> str:
|
||||||
|
@ -193,7 +192,6 @@ class MqttPlugin(RunnablePlugin):
|
||||||
host,
|
host,
|
||||||
str(port),
|
str(port),
|
||||||
json.dumps(sorted(topics)),
|
json.dumps(sorted(topics)),
|
||||||
str(id(on_message)),
|
|
||||||
]
|
]
|
||||||
).encode()
|
).encode()
|
||||||
).hexdigest()
|
).hexdigest()
|
||||||
|
@ -344,7 +342,6 @@ class MqttPlugin(RunnablePlugin):
|
||||||
host=kwargs['host'],
|
host=kwargs['host'],
|
||||||
port=kwargs['port'],
|
port=kwargs['port'],
|
||||||
client_id=client_id,
|
client_id=client_id,
|
||||||
on_message=on_message,
|
|
||||||
topics=topics,
|
topics=topics,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -435,14 +432,8 @@ class MqttPlugin(RunnablePlugin):
|
||||||
response_buffer.close()
|
response_buffer.close()
|
||||||
|
|
||||||
if client:
|
if client:
|
||||||
try:
|
client.stop()
|
||||||
client.loop_stop()
|
del client
|
||||||
except Exception as e:
|
|
||||||
self.logger.warning(
|
|
||||||
'Could not stop client loop: %s: %s', type(e).__name__, e
|
|
||||||
)
|
|
||||||
|
|
||||||
client.disconnect()
|
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def subscribe(self, topic: str, **mqtt_kwargs):
|
def subscribe(self, topic: str, **mqtt_kwargs):
|
||||||
|
@ -473,7 +464,21 @@ class MqttPlugin(RunnablePlugin):
|
||||||
:param mqtt_kwargs: MQTT broker configuration (host, port, username,
|
:param mqtt_kwargs: MQTT broker configuration (host, port, username,
|
||||||
password etc.). See :meth:`.__init__` parameters.
|
password etc.). See :meth:`.__init__` parameters.
|
||||||
"""
|
"""
|
||||||
self._get_client(**mqtt_kwargs).unsubscribe(topic)
|
client_id = self._get_client_id(
|
||||||
|
topics=(topic,),
|
||||||
|
**mqtt_kwargs,
|
||||||
|
)
|
||||||
|
|
||||||
|
with self._listeners_lock[client_id]:
|
||||||
|
client = self.listeners.get(client_id)
|
||||||
|
|
||||||
|
if not client:
|
||||||
|
self.logger.info('No subscriptions found for topic %s', topic)
|
||||||
|
return
|
||||||
|
|
||||||
|
client.unsubscribe(topic)
|
||||||
|
client.stop()
|
||||||
|
del client
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _response_callback(reply_topic: str, event: threading.Event, buffer: IO[bytes]):
|
def _response_callback(reply_topic: str, event: threading.Event, buffer: IO[bytes]):
|
||||||
|
@ -527,6 +532,7 @@ class MqttPlugin(RunnablePlugin):
|
||||||
for listener in self.listeners.values():
|
for listener in self.listeners.values():
|
||||||
try:
|
try:
|
||||||
listener.join(timeout=1)
|
listener.join(timeout=1)
|
||||||
|
del listener
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
|
@ -236,6 +236,11 @@ class MqttClient(mqtt.Client, threading.Thread):
|
||||||
if not self.is_alive():
|
if not self.is_alive():
|
||||||
return
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.loop_stop()
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.debug('Could not stop client loop: %s: %s', type(e).__name__, e)
|
||||||
|
|
||||||
self._stop_scheduled = True
|
self._stop_scheduled = True
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
self._running = False
|
self._running = False
|
||||||
|
|
Loading…
Reference in a new issue