From 8a84a36905c7e146a24d13cd4109a7fac0a77dab Mon Sep 17 00:00:00 2001
From: Fabio Manganiello <fabio@manganiello.tech>
Date: Fri, 19 Jan 2024 03:11:55 +0100
Subject: [PATCH 1/2] [#351] Merged `google.pubsub` plugin and backend.

Closes: #351
---
 docs/source/backends.rst                      |   1 -
 .../platypush/backend/google.pubsub.rst       |   5 -
 platypush/backend/google/__init__.py          |   0
 platypush/backend/google/pubsub/__init__.py   |  88 ---------
 platypush/backend/google/pubsub/manifest.yaml |   9 -
 platypush/plugins/google/pubsub/__init__.py   | 171 +++++++++++++++---
 platypush/plugins/google/pubsub/manifest.yaml |   3 +-
 platypush/utils/mock/modules.py               |   3 +
 8 files changed, 150 insertions(+), 130 deletions(-)
 delete mode 100644 docs/source/platypush/backend/google.pubsub.rst
 delete mode 100644 platypush/backend/google/__init__.py
 delete mode 100644 platypush/backend/google/pubsub/__init__.py
 delete mode 100644 platypush/backend/google/pubsub/manifest.yaml

diff --git a/docs/source/backends.rst b/docs/source/backends.rst
index 3a152791..8b29fb7f 100644
--- a/docs/source/backends.rst
+++ b/docs/source/backends.rst
@@ -10,7 +10,6 @@ Backends
     platypush/backend/button.flic.rst
     platypush/backend/camera.pi.rst
     platypush/backend/chat.telegram.rst
-    platypush/backend/google.pubsub.rst
     platypush/backend/gps.rst
     platypush/backend/http.rst
     platypush/backend/mail.rst
diff --git a/docs/source/platypush/backend/google.pubsub.rst b/docs/source/platypush/backend/google.pubsub.rst
deleted file mode 100644
index 76bffd3c..00000000
--- a/docs/source/platypush/backend/google.pubsub.rst
+++ /dev/null
@@ -1,5 +0,0 @@
-``google.pubsub``
-===================================
-
-.. automodule:: platypush.backend.google.pubsub
-    :members:
diff --git a/platypush/backend/google/__init__.py b/platypush/backend/google/__init__.py
deleted file mode 100644
index e69de29b..00000000
diff --git a/platypush/backend/google/pubsub/__init__.py b/platypush/backend/google/pubsub/__init__.py
deleted file mode 100644
index 39a6d3e1..00000000
--- a/platypush/backend/google/pubsub/__init__.py
+++ /dev/null
@@ -1,88 +0,0 @@
-import json
-
-from typing import Optional, List
-
-from platypush.backend import Backend
-from platypush.context import get_plugin
-from platypush.message.event.google.pubsub import GooglePubsubMessageEvent
-
-
-class GooglePubsubBackend(Backend):
-    """
-    Subscribe to a list of topics on a Google Pub/Sub instance. See
-    :class:`platypush.plugins.google.pubsub.GooglePubsubPlugin` for a reference on how to generate your
-    project and credentials file.
-    """
-
-    def __init__(
-        self, topics: List[str], credentials_file: Optional[str] = None, *args, **kwargs
-    ):
-        """
-        :param topics: List of topics to subscribe. You can either specify the full topic name in the format
-            ``projects/<project_id>/topics/<topic_name>``, where ``<project_id>`` must be the ID of your
-            Google Pub/Sub project, or just ``<topic_name>``  - in such case it's implied that you refer to the
-            ``topic_name`` under the ``project_id`` of your service credentials.
-        :param credentials_file: Path to the Pub/Sub service credentials file (default: value configured on the
-            ``google.pubsub`` plugin or ``~/.credentials/platypush/google/pubsub.json``).
-        """
-
-        super().__init__(*args, name='GooglePubSub', **kwargs)
-        self.topics = topics
-
-        if credentials_file:
-            self.credentials_file = credentials_file
-        else:
-            plugin = self._get_plugin()
-            self.credentials_file = plugin.credentials_file
-
-    @staticmethod
-    def _get_plugin():
-        plugin = get_plugin('google.pubsub')
-        assert plugin, 'google.pubsub plugin not enabled'
-        return plugin
-
-    def _message_callback(self, topic):
-        def callback(msg):
-            data = msg.data.decode()
-            try:
-                data = json.loads(data)
-            except Exception as e:
-                self.logger.debug('Not a valid JSON: %s: %s', data, e)
-
-            msg.ack()
-            self.bus.post(GooglePubsubMessageEvent(topic=topic, msg=data))
-
-        return callback
-
-    def run(self):
-        # noinspection PyPackageRequirements
-        from google.cloud import pubsub_v1
-
-        # noinspection PyPackageRequirements
-        from google.api_core.exceptions import AlreadyExists
-
-        super().run()
-        plugin = self._get_plugin()
-        project_id = plugin.get_project_id()
-        credentials = plugin.get_credentials(plugin.subscriber_audience)
-        subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
-
-        for topic in self.topics:
-            prefix = f'projects/{project_id}/topics/'
-            if not topic.startswith(prefix):
-                topic = f'{prefix}{topic}'
-            subscription_name = '/'.join(
-                [*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
-            )
-
-            try:
-                subscriber.create_subscription(name=subscription_name, topic=topic)
-            except AlreadyExists:
-                pass
-
-            subscriber.subscribe(subscription_name, self._message_callback(topic))
-
-        self.wait_stop()
-
-
-# vim:sw=4:ts=4:et:
diff --git a/platypush/backend/google/pubsub/manifest.yaml b/platypush/backend/google/pubsub/manifest.yaml
deleted file mode 100644
index c7fd5c41..00000000
--- a/platypush/backend/google/pubsub/manifest.yaml
+++ /dev/null
@@ -1,9 +0,0 @@
-manifest:
-  events:
-    platypush.message.event.google.pubsub.GooglePubsubMessageEvent: when a new message
-      is received ona subscribed topic.
-  install:
-    pip:
-    - google-cloud-pubsub
-  package: platypush.backend.google.pubsub
-  type: backend
diff --git a/platypush/plugins/google/pubsub/__init__.py b/platypush/plugins/google/pubsub/__init__.py
index 1834844d..7b66b6be 100644
--- a/platypush/plugins/google/pubsub/__init__.py
+++ b/platypush/plugins/google/pubsub/__init__.py
@@ -1,59 +1,87 @@
 import json
 import os
+from threading import RLock
+from typing import Iterable, Optional
 
-from platypush.plugins import Plugin, action
+from google.auth import jwt
+from google.cloud import pubsub_v1 as pubsub  # pylint: disable=no-name-in-module
+from google.api_core.exceptions import AlreadyExists, NotFound
+
+from platypush.config import Config
+from platypush.message.event.google.pubsub import GooglePubsubMessageEvent
+from platypush.plugins import RunnablePlugin, action
 
 
-class GooglePubsubPlugin(Plugin):
+class GooglePubsubPlugin(RunnablePlugin):
     """
-    Send messages over a Google pub/sub instance.
-    You'll need a Google Cloud active project and a set of credentials to use this plugin:
+    Publishes and subscribes to Google Pub/Sub topics.
+
+    You'll need a Google Cloud active project and a set of credentials to use
+    this plugin:
 
         1. Create a project on the `Google Cloud console
-        <https://console.cloud.google.com/projectcreate>`_ if you don't have
-        one already.
+           <https://console.cloud.google.com/projectcreate>`_ if you don't have
+           one already.
 
         2. In the `Google Cloud API console
-        <https://console.cloud.google.com/apis/credentials/serviceaccountkey>`_
-        create a new service account key. Select "New Service Account", choose
-        the role "Pub/Sub Editor" and leave the key type as JSON.
+           <https://console.cloud.google.com/apis/credentials/serviceaccountkey>`_
+           create a new service account key. Select "New Service Account", choose
+           the role "Pub/Sub Editor" and leave the key type as JSON.
 
         3. Download the JSON service credentials file. By default Platypush
-        will look for the credentials file under
-        ``~/.credentials/platypush/google/pubsub.json``.
+           will look for the credentials file under
+           ``<WORKDIR>/credentials/google/pubsub.json``.
 
     """
 
     publisher_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher'
     subscriber_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber'
     default_credentials_file = os.path.join(
-        os.path.expanduser('~'), '.credentials', 'platypush', 'google', 'pubsub.json'
+        Config.get_workdir(), 'credentials', 'google', 'pubsub.json'
     )
 
-    def __init__(self, credentials_file: str = default_credentials_file, **kwargs):
+    def __init__(
+        self,
+        credentials_file: str = default_credentials_file,
+        topics: Iterable[str] = (),
+        **kwargs,
+    ):
         """
         :param credentials_file: Path to the JSON credentials file for Google
             pub/sub (default: ``~/.credentials/platypush/google/pubsub.json``)
+        :param topics: List of topics to subscribe. You can either specify the
+            full topic name in the format
+            ``projects/<project_id>/topics/<topic_name>``, where
+            ``<project_id>`` must be the ID of your Google Pub/Sub project, or
+            just ``<topic_name>`` - in such case it's implied that you refer to
+            the ``topic_name`` under the ``project_id`` of your service
+            credentials.
         """
         super().__init__(**kwargs)
         self.credentials_file = credentials_file
         self.project_id = self.get_project_id()
+        self.topics = topics
+        self._subscriber: Optional[pubsub.SubscriberClient] = None
+        self._subscriber_lock = RLock()
 
     def get_project_id(self):
         with open(self.credentials_file) as f:
             return json.load(f).get('project_id')
 
     def get_credentials(self, audience: str):
-        from google.auth import jwt
-
         return jwt.Credentials.from_service_account_file(
             self.credentials_file, audience=audience
         )
 
+    def _norm_topic(self, topic: str):
+        if not topic.startswith(f'projects/{self.project_id}/topics/'):
+            topic = f'projects/{self.project_id}/topics/{topic}'
+        return topic
+
     @action
-    def send_message(self, topic: str, msg, **kwargs):
+    def publish(self, topic: str, msg, **kwargs):
         """
-        Sends a message to a topic
+        Publish a message to a topic
 
         :param topic: Topic/channel where the message will be delivered. You
             can either specify the full topic name in the format
@@ -65,19 +93,14 @@ class GooglePubsubPlugin(Plugin):
         :param msg: Message to be sent. It can be a list, a dict, or a Message object
         :param kwargs: Extra arguments to be passed to .publish()
         """
-        from google.cloud import pubsub_v1
-        from google.api_core.exceptions import AlreadyExists
-
         credentials = self.get_credentials(self.publisher_audience)
-        publisher = pubsub_v1.PublisherClient(credentials=credentials)
-
-        if not topic.startswith(f'projects/{self.project_id}/topics/'):
-            topic = f'projects/{self.project_id}/topics/{topic}'
+        publisher = pubsub.PublisherClient(credentials=credentials)
+        topic = self._norm_topic(topic)
 
         try:
-            publisher.create_topic(topic)
+            publisher.create_topic(name=topic)
         except AlreadyExists:
-            pass
+            self.logger.debug('Topic %s already exists', topic)
 
         if isinstance(msg, (int, float)):
             msg = str(msg)
@@ -88,5 +111,101 @@ class GooglePubsubPlugin(Plugin):
 
         publisher.publish(topic, msg, **kwargs)
 
+    @action
+    def send_message(self, topic: str, msg, **kwargs):
+        """
+        Alias for :meth:`.publish`
+        """
+        self.publish(topic=topic, msg=msg, **kwargs)
+
+    @action
+    def subscribe(self, topic: str):
+        """
+        Subscribe to a topic.
+
+        :param topic: Topic/channel where the message will be delivered. You
+            can either specify the full topic name in the format
+            ``projects/<project_id>/topics/<topic_name>``, where
+            ``<project_id>`` must be the ID of your Google Pub/Sub project, or
+            just ``<topic_name>`` - in such case it's implied that you refer to
+            the ``topic_name`` under the ``project_id`` of your service
+            credentials.
+        """
+        assert self._subscriber, 'Subscriber not initialized'
+        topic = self._norm_topic(topic)
+        subscription_name = '/'.join(
+            [*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
+        )
+
+        try:
+            self._subscriber.create_subscription(name=subscription_name, topic=topic)
+        except AlreadyExists:
+            self.logger.debug('Subscription %s already exists', subscription_name)
+
+        self._subscriber.subscribe(subscription_name, self._message_callback(topic))
+
+    @action
+    def unsubscribe(self, topic: str):
+        """
+        Unsubscribe from a topic.
+
+        :param topic: Topic/channel where the message will be delivered. You
+            can either specify the full topic name in the format
+            ``projects/<project_id>/topics/<topic_name>``, where
+            ``<project_id>`` must be the ID of your Google Pub/Sub project, or
+            just ``<topic_name>`` - in such case it's implied that you refer to
+            the ``topic_name`` under the ``project_id`` of your service
+            credentials.
+        """
+        assert self._subscriber, 'Subscriber not initialized'
+        topic = self._norm_topic(topic)
+        subscription_name = '/'.join(
+            [*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
+        )
+
+        try:
+            self._subscriber.delete_subscription(subscription=subscription_name)
+        except NotFound:
+            self.logger.debug('Subscription %s not found', subscription_name)
+
+    def _message_callback(self, topic):
+        def callback(msg):
+            data = msg.data.decode()
+            try:
+                data = json.loads(data)
+            except Exception as e:
+                self.logger.debug('Not a valid JSON: %s: %s', data, e)
+
+            msg.ack()
+            self._bus.post(GooglePubsubMessageEvent(topic=topic, msg=data))
+
+        return callback
+
+    def main(self):
+        credentials = self.get_credentials(self.subscriber_audience)
+        with self._subscriber_lock:
+            self._subscriber = pubsub.SubscriberClient(credentials=credentials)
+
+        for topic in self.topics:
+            self.subscribe(topic=topic)
+
+        self.wait_stop()
+        with self._subscriber_lock:
+            self._close()
+
+    def _close(self):
+        with self._subscriber_lock:
+            if self._subscriber:
+                try:
+                    self._subscriber.close()
+                except Exception as e:
+                    self.logger.debug('Error while closing the subscriber: %s', e)
+
+                self._subscriber = None
+
+    def stop(self):
+        self._close()
+        super().stop()
+
 
 # vim:sw=4:ts=4:et:
diff --git a/platypush/plugins/google/pubsub/manifest.yaml b/platypush/plugins/google/pubsub/manifest.yaml
index 360fba64..b16e30bc 100644
--- a/platypush/plugins/google/pubsub/manifest.yaml
+++ b/platypush/plugins/google/pubsub/manifest.yaml
@@ -1,5 +1,6 @@
 manifest:
-  events: {}
+  events:
+    - platypush.message.event.google.pubsub.GooglePubsubMessageEvent
   install:
     apk:
       - py3-google-api-python-client
diff --git a/platypush/utils/mock/modules.py b/platypush/utils/mock/modules.py
index 1a192170..6af74355 100644
--- a/platypush/utils/mock/modules.py
+++ b/platypush/utils/mock/modules.py
@@ -35,10 +35,13 @@ mock_imports = [
     "gi",
     "gi.repository",
     "google",
+    "google.api_core",
+    "google.auth",
     "google.assistant.embedded",
     "google.assistant.library",
     "google.assistant.library.event",
     "google.assistant.library.file_helpers",
+    "google.cloud",
     "google.oauth2.credentials",
     "googlesamples",
     "googlesamples.assistant.grpc.audio_helpers",

From d82a5ecb1e75f7ad24bbef6143b7119477c2ae0e Mon Sep 17 00:00:00 2001
From: Fabio Manganiello <fabio@manganiello.tech>
Date: Fri, 19 Jan 2024 21:54:49 +0100
Subject: [PATCH 2/2] [#356] Merged `adafruit.io` plugin and backend.

---
 docs/source/backends.rst                      |   1 -
 docs/source/platypush/backend/adafruit.io.rst |   6 -
 platypush/backend/adafruit/__init__.py        |   0
 platypush/backend/adafruit/io/__init__.py     |  97 ------
 platypush/backend/adafruit/io/manifest.yaml   |  12 -
 platypush/message/event/adafruit.py           |  13 +-
 platypush/plugins/adafruit/io/__init__.py     | 327 +++++++++++-------
 platypush/plugins/adafruit/io/manifest.yaml   |   5 +-
 8 files changed, 211 insertions(+), 250 deletions(-)
 delete mode 100644 docs/source/platypush/backend/adafruit.io.rst
 delete mode 100644 platypush/backend/adafruit/__init__.py
 delete mode 100644 platypush/backend/adafruit/io/__init__.py
 delete mode 100644 platypush/backend/adafruit/io/manifest.yaml

diff --git a/docs/source/backends.rst b/docs/source/backends.rst
index 8b29fb7f..1d1612ab 100644
--- a/docs/source/backends.rst
+++ b/docs/source/backends.rst
@@ -6,7 +6,6 @@ Backends
     :maxdepth: 1
     :caption: Backends:
 
-    platypush/backend/adafruit.io.rst
     platypush/backend/button.flic.rst
     platypush/backend/camera.pi.rst
     platypush/backend/chat.telegram.rst
diff --git a/docs/source/platypush/backend/adafruit.io.rst b/docs/source/platypush/backend/adafruit.io.rst
deleted file mode 100644
index d4bd4cc4..00000000
--- a/docs/source/platypush/backend/adafruit.io.rst
+++ /dev/null
@@ -1,6 +0,0 @@
-``adafruit.io``
-=================================
-
-.. automodule:: platypush.backend.adafruit.io
-	:members:
-
diff --git a/platypush/backend/adafruit/__init__.py b/platypush/backend/adafruit/__init__.py
deleted file mode 100644
index e69de29b..00000000
diff --git a/platypush/backend/adafruit/io/__init__.py b/platypush/backend/adafruit/io/__init__.py
deleted file mode 100644
index a22856ae..00000000
--- a/platypush/backend/adafruit/io/__init__.py
+++ /dev/null
@@ -1,97 +0,0 @@
-from typing import Optional
-
-from platypush.backend import Backend
-from platypush.context import get_plugin
-from platypush.message.event.adafruit import (
-    ConnectedEvent,
-    DisconnectedEvent,
-    FeedUpdateEvent,
-)
-
-
-class AdafruitIoBackend(Backend):
-    """
-    Backend that listens to messages received over the Adafruit IO message queue
-
-    Requires:
-
-        * The :class:`platypush.plugins.adafruit.io.AdafruitIoPlugin` plugin to
-            be active and configured.
-    """
-
-    def __init__(self, feeds, *args, **kwargs):
-        """
-        :param feeds: List of feed IDs to monitor
-        :type feeds: list[str]
-        """
-
-        super().__init__(*args, **kwargs)
-        from Adafruit_IO import MQTTClient
-
-        self.feeds = feeds
-        self._client: Optional[MQTTClient] = None
-
-    def _init_client(self):
-        if self._client:
-            return
-
-        from Adafruit_IO import MQTTClient
-
-        plugin = get_plugin('adafruit.io')
-        if not plugin:
-            raise RuntimeError('Adafruit IO plugin not configured')
-
-        # noinspection PyProtectedMember
-        self._client = MQTTClient(plugin._username, plugin._key)
-        self._client.on_connect = self.on_connect()
-        self._client.on_disconnect = self.on_disconnect()
-        self._client.on_message = self.on_message()
-
-    def on_connect(self):
-        def _handler(client):
-            for feed in self.feeds:
-                client.subscribe(feed)
-            self.bus.post(ConnectedEvent())
-
-        return _handler
-
-    def on_disconnect(self):
-        def _handler(*_, **__):
-            self.bus.post(DisconnectedEvent())
-
-        return _handler
-
-    def on_message(self, *_, **__):
-        # noinspection PyUnusedLocal
-        def _handler(client, feed, data):
-            try:
-                data = float(data)
-            except Exception as e:
-                self.logger.debug('Not a number: {}: {}'.format(data, e))
-
-            self.bus.post(FeedUpdateEvent(feed=feed, data=data))
-
-        return _handler
-
-    def run(self):
-        super().run()
-
-        self.logger.info(
-            ('Initialized Adafruit IO backend, listening on ' + 'feeds {}').format(
-                self.feeds
-            )
-        )
-
-        while not self.should_stop():
-            try:
-                self._init_client()
-                # noinspection PyUnresolvedReferences
-                self._client.connect()
-                # noinspection PyUnresolvedReferences
-                self._client.loop_blocking()
-            except Exception as e:
-                self.logger.exception(e)
-                self._client = None
-
-
-# vim:sw=4:ts=4:et:
diff --git a/platypush/backend/adafruit/io/manifest.yaml b/platypush/backend/adafruit/io/manifest.yaml
deleted file mode 100644
index e019ab9c..00000000
--- a/platypush/backend/adafruit/io/manifest.yaml
+++ /dev/null
@@ -1,12 +0,0 @@
-manifest:
-  events:
-    platypush.message.event.adafruit.ConnectedEvent: when thebackend connects to the
-      Adafruit queue
-    platypush.message.event.adafruit.DisconnectedEvent: when thebackend disconnects
-      from the Adafruit queue
-    platypush.message.event.adafruit.FeedUpdateEvent: when anupdate event is received
-      on a monitored feed
-  install:
-    pip: []
-  package: platypush.backend.adafruit.io
-  type: backend
diff --git a/platypush/message/event/adafruit.py b/platypush/message/event/adafruit.py
index fea8e2a0..341c24fe 100644
--- a/platypush/message/event/adafruit.py
+++ b/platypush/message/event/adafruit.py
@@ -1,27 +1,28 @@
 from platypush.message.event import Event
 
 
-class ConnectedEvent(Event):
+class AdafruitConnectedEvent(Event):
     """
-    Event triggered when the backend connects to the Adafruit message queue
+    Event triggered when the backend connects to the Adafruit message queue.
     """
 
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
 
 
-class DisconnectedEvent(Event):
+class AdafruitDisconnectedEvent(Event):
     """
-    Event triggered when the backend disconnects from the Adafruit message queue
+    Event triggered when the backend disconnects from the Adafruit message
+    queue.
     """
 
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
 
 
-class FeedUpdateEvent(Event):
+class AdafruitFeedUpdateEvent(Event):
     """
-    Event triggered upon Adafruit IO feed update
+    Event triggered when a message is received on a subscribed Adafruit feed.
     """
 
     def __init__(self, feed, data, *args, **kwargs):
diff --git a/platypush/plugins/adafruit/io/__init__.py b/platypush/plugins/adafruit/io/__init__.py
index 31dcbfae..23d08de3 100644
--- a/platypush/plugins/adafruit/io/__init__.py
+++ b/platypush/plugins/adafruit/io/__init__.py
@@ -1,26 +1,30 @@
-import ast
 import statistics
-import json
 import time
 
-from threading import Thread, Lock
+from queue import Empty, Queue
+from threading import Thread
+from typing import Iterable, Optional, Union
 
-from platypush.context import get_backend
-from platypush.plugins import Plugin, action
-
-data_throttler_lock = None
+from platypush.message.event.adafruit import (
+    AdafruitConnectedEvent,
+    AdafruitDisconnectedEvent,
+    AdafruitFeedUpdateEvent,
+)
+from platypush.plugins import RunnablePlugin, action
 
 
-class AdafruitIoPlugin(Plugin):
+class AdafruitIoPlugin(RunnablePlugin):
     """
-    This plugin allows you to interact with the Adafruit IO
-    <https://io.adafruit.com>, a cloud-based message queue and storage.
-    You can send values to feeds on your Adafruit IO account and read the
-    values of those feeds as well through any device.
+    This plugin allows you to interact with `Adafruit IO
+    <https://io.adafruit.com>`_, a cloud-based message queue and storage. You
+    can use this plugin to send and receive data to topics connected to your
+    Adafruit IO account.
 
-    Some example usages::
+    Some example usages:
 
-        # Send the temperature value for a connected sensor to the "temperature" feed
+      .. code-block:: javascript
+
+        // Send the temperature value for a connected sensor to the "temperature" feed
         {
             "type": "request",
             "action": "adafruit.io.send",
@@ -30,7 +34,7 @@ class AdafruitIoPlugin(Plugin):
             }
         }
 
-        # Receive the most recent temperature value
+        // Receive the most recent temperature value
         {
             "type": "request",
             "action": "adafruit.io.receive",
@@ -38,164 +42,194 @@ class AdafruitIoPlugin(Plugin):
                 "feed": "temperature"
             }
         }
+
     """
 
-    _DATA_THROTTLER_QUEUE = 'platypush/adafruit.io'
-
-    def __init__(self, username, key, throttle_seconds=None, **kwargs):
+    def __init__(
+        self,
+        username: str,
+        key: str,
+        feeds: Iterable[str] = (),
+        throttle_interval: Optional[float] = None,
+        **kwargs
+    ):
         """
         :param username: Your Adafruit username
-        :type username: str
-
         :param key: Your Adafruit IO key
-        :type key: str
-
-        :param throttle_seconds: If set, then instead of sending the values directly over ``send`` the plugin will
-            first collect all the samples within the specified period and then dispatch them to Adafruit IO.
-            You may want to set it if you have data sources providing a lot of data points and you don't want to hit
-            the throttling limitations of Adafruit.
-        :type throttle_seconds: float
+        :param feeds: List of feeds to subscribe to. If not set, then the plugin
+            will not subscribe to any feed unless instructed to do so, neither
+            it will emit any event.
+        :param throttle_interval: If set, then instead of sending the values
+            directly over ``send`` the plugin will first collect all the
+            samples within the specified period and then dispatch them to
+            Adafruit IO. You may want to set it if you have data sources
+            providing a lot of data points and you don't want to hit the
+            throttling limitations of Adafruit.
         """
 
-        from Adafruit_IO import Client
+        from Adafruit_IO import Client, MQTTClient
 
-        global data_throttler_lock
         super().__init__(**kwargs)
 
         self._username = username
         self._key = key
+        self.feeds = feeds
         self.aio = Client(username=username, key=key)
-        self.throttle_seconds = throttle_seconds
+        self._mqtt_client: Optional[MQTTClient] = None
+        self.throttle_interval = throttle_interval
+        self._data_throttler_thread: Optional[Thread] = None
+        self._data_throttler_queue = Queue()
 
-        if not data_throttler_lock:
-            data_throttler_lock = Lock()
+    @property
+    def _mqtt(self):
+        if not self._mqtt_client:
+            from Adafruit_IO import MQTTClient
 
-        if self.throttle_seconds and not data_throttler_lock.locked():
-            self._get_redis()
-            self.logger.info('Starting Adafruit IO throttler thread')
-            data_throttler_lock.acquire(False)
-            self.data_throttler = Thread(target=self._data_throttler())
-            self.data_throttler.start()
+            self._mqtt_client = MQTTClient(self._username, self._key)
+            self._mqtt_client.on_connect = self._on_connect  # type: ignore
+            self._mqtt_client.on_disconnect = self._on_disconnect  # type: ignore
+            self._mqtt_client.on_message = self._on_message  # type: ignore
 
-    @staticmethod
-    def _get_redis():
-        from redis import Redis
+        return self._mqtt_client
 
-        redis_args = {
-            'host': 'localhost',
-        }
+    def _on_connect(self, *_, **__):
+        assert self._mqtt_client, 'MQTT client not initialized'
+        for feed in self.feeds:
+            self._mqtt_client.subscribe(feed)
+        self._bus.post(AdafruitConnectedEvent())
 
-        redis_backend = get_backend('redis')
-        if redis_backend:
-            redis_args = get_backend('redis').redis_args
-        redis_args['socket_timeout'] = 1
-        return Redis(**redis_args)
+    def _on_disconnect(self, *_, **__):
+        self._bus.post(AdafruitDisconnectedEvent())
+
+    def _on_message(self, _, feed, data, *__):
+        try:
+            data = float(data)
+        except (TypeError, ValueError):
+            pass
+
+        self._bus.post(AdafruitFeedUpdateEvent(feed=feed, data=data))
+
+    def _subscribe(self, feed: str):
+        assert self._mqtt_client, 'MQTT client not initialized'
+        self._mqtt_client.subscribe(feed)
+
+    def _unsubscribe(self, feed: str):
+        assert self._mqtt_client, 'MQTT client not initialized'
+        self._mqtt_client.unsubscribe(feed)
 
     def _data_throttler(self):
-        from redis.exceptions import TimeoutError as QueueTimeoutError
+        from Adafruit_IO import ThrottlingError
 
-        def run():
-            from Adafruit_IO import ThrottlingError
+        if not self.throttle_interval:
+            return
 
-            redis = self._get_redis()
-            last_processed_batch_timestamp = None
-            data = {}
+        last_processed_batch_timestamp = None
+        data = {}
 
-            try:
-                while True:
-                    try:
-                        new_data = ast.literal_eval(
-                            redis.blpop(self._DATA_THROTTLER_QUEUE)[1].decode('utf-8')
-                        )
+        try:
+            while not self.should_stop():
+                try:
+                    new_data = self._data_throttler_queue.get(timeout=1)
+                    for key, value in new_data.items():
+                        data.setdefault(key, []).append(value)
+                except Empty:
+                    continue
 
-                        for (key, value) in new_data.items():
-                            data.setdefault(key, []).append(value)
-                    except QueueTimeoutError:
-                        pass
+                should_process = data and (
+                    last_processed_batch_timestamp is None
+                    or time.time() - last_processed_batch_timestamp
+                    >= self.throttle_interval
+                )
 
-                    if data and (
-                        last_processed_batch_timestamp is None
-                        or time.time() - last_processed_batch_timestamp
-                        >= self.throttle_seconds
-                    ):
-                        last_processed_batch_timestamp = time.time()
-                        self.logger.info('Processing feeds batch for Adafruit IO')
+                if not should_process:
+                    continue
 
-                        for (feed, values) in data.items():
-                            if values:
-                                value = statistics.mean(values)
+                last_processed_batch_timestamp = time.time()
+                self.logger.info('Processing feeds batch for Adafruit IO')
 
-                                try:
-                                    self.send(feed, value, enqueue=False)
-                                except ThrottlingError:
-                                    self.logger.warning(
-                                        'Adafruit IO throttling threshold hit, taking a nap '
-                                        + 'before retrying'
-                                    )
-                                    time.sleep(self.throttle_seconds)
+                for feed, values in data.items():
+                    if values:
+                        value = statistics.mean(values)
 
-                        data = {}
-            except Exception as e:
-                self.logger.exception(e)
+                        try:
+                            self.send(feed, value, enqueue=False)
+                        except ThrottlingError:
+                            self.logger.warning(
+                                'Adafruit IO throttling threshold hit, taking a nap '
+                                + 'before retrying'
+                            )
+                            self.wait_stop(self.throttle_interval)
 
-        return run
+                data = {}
+        except Exception as e:
+            self.logger.exception(e)
 
     @action
-    def send(self, feed, value, enqueue=True):
+    def send(self, feed: str, value: Union[int, float, str], enqueue: bool = True):
         """
-        Send a value to an Adafruit IO feed
+        Send a value to an Adafruit IO feed.
 
-        :param feed: Feed name
-        :type feed: str
-
-        :param value: Value to send
-        :type value: Numeric or string
-
-        :param enqueue: If throttle_seconds is set, this method by default will append values to the throttling queue
+        :param feed: Feed name.
+        :param value: Value to send.
+        :param enqueue: If throttle_interval is set, this method by default will append values to the throttling queue
             to be periodically flushed instead of sending the message directly. In such case, pass enqueue=False to
             override the behaviour and send the message directly instead.
-        :type enqueue: bool
         """
 
-        if not self.throttle_seconds or not enqueue:
+        if not self.throttle_interval or not enqueue:
             # If no throttling is configured, or enqueue is false then send the value directly to Adafruit
             self.aio.send(feed, value)
         else:
-            # Otherwise send it to the Redis queue to be picked up by the throttler thread
-            redis = self._get_redis()
-            redis.rpush(self._DATA_THROTTLER_QUEUE, json.dumps({feed: value}))
+            # Otherwise send it to the queue to be picked up by the throttler thread
+            self._data_throttler_queue.put({feed: value})
 
     @action
-    def send_location_data(self, feed, lat, lon, ele, value):
+    def send_location_data(
+        self,
+        feed: str,
+        latitude: float,
+        longitude: float,
+        elevation: float,
+        value: Optional[Union[int, float, str]] = None,
+    ):
         """
         Send location data to an Adafruit IO feed
 
         :param feed: Feed name
-        :type feed: str
-
         :param lat: Latitude
-        :type lat: float
-
         :param lon: Longitude
-        :type lon: float
-
         :param ele: Elevation
-        :type ele: float
-
-        :param value: Value to send
-        :type value: Numeric or string
+        :param value: Extra value to attach to the record
         """
 
         self.aio.send_data(
             feed=feed,
             value=value,
             metadata={
-                'lat': lat,
-                'lon': lon,
-                'ele': ele,
+                'lat': latitude,
+                'lon': longitude,
+                'ele': elevation,
             },
         )
 
+    @action
+    def subscribe(self, feed: str):
+        """
+        Subscribe to a feed.
+
+        :param feed: Feed name
+        """
+        self._subscribe(feed)
+
+    @action
+    def unsubscribe(self, feed: str):
+        """
+        Unsubscribe from a feed.
+
+        :param feed: Feed name
+        """
+        self._unsubscribe(feed)
+
     @classmethod
     def _cast_value(cls, value):
         try:
@@ -220,17 +254,14 @@ class AdafruitIoPlugin(Plugin):
         ]
 
     @action
-    def receive(self, feed, limit=1):
+    def receive(self, feed: str, limit: int = 1):
         """
         Receive data from the specified Adafruit IO feed
 
         :param feed: Feed name
-        :type feed: str
-
         :param limit: Maximum number of data points to be returned. If None,
             all the values in the feed will be returned. Default: 1 (return most
             recent value)
-        :type limit: int
         """
 
         if limit == 1:
@@ -241,42 +272,84 @@ class AdafruitIoPlugin(Plugin):
         return values[:limit] if limit else values
 
     @action
-    def receive_next(self, feed):
+    def receive_next(self, feed: str):
         """
         Receive the next unprocessed data point from a feed
 
         :param feed: Feed name
-        :type feed: str
         """
 
         values = self._convert_data_to_dict(self.aio.receive_next(feed))
         return values[0] if values else None
 
     @action
-    def receive_previous(self, feed):
+    def receive_previous(self, feed: str):
         """
         Receive the last processed data point from a feed
 
         :param feed: Feed name
-        :type feed: str
         """
 
         values = self._convert_data_to_dict(self.aio.receive_previous(feed))
         return values[0] if values else None
 
     @action
-    def delete(self, feed, data_id):
+    def delete(self, feed: str, data_id: str):
         """
         Delete a data point from a feed
 
         :param feed: Feed name
-        :type feed: str
-
         :param data_id: Data point ID to remove
-        :type data_id: str
         """
 
         self.aio.delete(feed, data_id)
 
+    def main(self):
+        if self.throttle_interval:
+            self._data_throttler_thread = Thread(target=self._data_throttler)
+            self._data_throttler_thread.start()
+
+        try:
+            while not self.should_stop():
+                cur_wait = 1
+                max_wait = 60
+
+                try:
+                    self._mqtt.connect()
+                    cur_wait = 1
+                    self._mqtt.loop_blocking()
+                except Exception as e:
+                    self.logger.warning(
+                        'Adafruit IO connection error: %s, retrying in %d seconds',
+                        e,
+                        cur_wait,
+                    )
+
+                    self.logger.exception(e)
+                    self.wait_stop(cur_wait)
+                    cur_wait = min(cur_wait * 2, max_wait)
+                finally:
+                    self._stop_mqtt()
+        finally:
+            self._stop_data_throttler()
+
+    def _stop_mqtt(self):
+        if self._mqtt_client:
+            self._mqtt_client.disconnect()
+            self._mqtt_client = None
+
+    def _stop_data_throttler(self):
+        if self._data_throttler_thread:
+            self._data_throttler_thread.join()
+            self._data_throttler_thread = None
+
+    def _stop(self):
+        self._stop_mqtt()
+        self._stop_data_throttler()
+
+    def stop(self):
+        self._stop()
+        super().stop()
+
 
 # vim:sw=4:ts=4:et:
diff --git a/platypush/plugins/adafruit/io/manifest.yaml b/platypush/plugins/adafruit/io/manifest.yaml
index b346fa3c..968826cc 100644
--- a/platypush/plugins/adafruit/io/manifest.yaml
+++ b/platypush/plugins/adafruit/io/manifest.yaml
@@ -1,5 +1,8 @@
 manifest:
-  events: {}
+  events:
+    - platypush.message.event.adafruit.AdafruitConnectedEvent
+    - platypush.message.event.adafruit.AdafruitDisconnectedEvent
+    - platypush.message.event.adafruit.AdafruitFeedUpdateEvent
   install:
     pip:
     - adafruit-io