diff --git a/docs/source/backends.rst b/docs/source/backends.rst index 8b29fb7f..1d1612ab 100644 --- a/docs/source/backends.rst +++ b/docs/source/backends.rst @@ -6,7 +6,6 @@ Backends :maxdepth: 1 :caption: Backends: - platypush/backend/adafruit.io.rst platypush/backend/button.flic.rst platypush/backend/camera.pi.rst platypush/backend/chat.telegram.rst diff --git a/docs/source/platypush/backend/adafruit.io.rst b/docs/source/platypush/backend/adafruit.io.rst deleted file mode 100644 index d4bd4cc4..00000000 --- a/docs/source/platypush/backend/adafruit.io.rst +++ /dev/null @@ -1,6 +0,0 @@ -``adafruit.io`` -================================= - -.. automodule:: platypush.backend.adafruit.io - :members: - diff --git a/platypush/backend/adafruit/__init__.py b/platypush/backend/adafruit/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/platypush/backend/adafruit/io/__init__.py b/platypush/backend/adafruit/io/__init__.py deleted file mode 100644 index a22856ae..00000000 --- a/platypush/backend/adafruit/io/__init__.py +++ /dev/null @@ -1,97 +0,0 @@ -from typing import Optional - -from platypush.backend import Backend -from platypush.context import get_plugin -from platypush.message.event.adafruit import ( - ConnectedEvent, - DisconnectedEvent, - FeedUpdateEvent, -) - - -class AdafruitIoBackend(Backend): - """ - Backend that listens to messages received over the Adafruit IO message queue - - Requires: - - * The :class:`platypush.plugins.adafruit.io.AdafruitIoPlugin` plugin to - be active and configured. - """ - - def __init__(self, feeds, *args, **kwargs): - """ - :param feeds: List of feed IDs to monitor - :type feeds: list[str] - """ - - super().__init__(*args, **kwargs) - from Adafruit_IO import MQTTClient - - self.feeds = feeds - self._client: Optional[MQTTClient] = None - - def _init_client(self): - if self._client: - return - - from Adafruit_IO import MQTTClient - - plugin = get_plugin('adafruit.io') - if not plugin: - raise RuntimeError('Adafruit IO plugin not configured') - - # noinspection PyProtectedMember - self._client = MQTTClient(plugin._username, plugin._key) - self._client.on_connect = self.on_connect() - self._client.on_disconnect = self.on_disconnect() - self._client.on_message = self.on_message() - - def on_connect(self): - def _handler(client): - for feed in self.feeds: - client.subscribe(feed) - self.bus.post(ConnectedEvent()) - - return _handler - - def on_disconnect(self): - def _handler(*_, **__): - self.bus.post(DisconnectedEvent()) - - return _handler - - def on_message(self, *_, **__): - # noinspection PyUnusedLocal - def _handler(client, feed, data): - try: - data = float(data) - except Exception as e: - self.logger.debug('Not a number: {}: {}'.format(data, e)) - - self.bus.post(FeedUpdateEvent(feed=feed, data=data)) - - return _handler - - def run(self): - super().run() - - self.logger.info( - ('Initialized Adafruit IO backend, listening on ' + 'feeds {}').format( - self.feeds - ) - ) - - while not self.should_stop(): - try: - self._init_client() - # noinspection PyUnresolvedReferences - self._client.connect() - # noinspection PyUnresolvedReferences - self._client.loop_blocking() - except Exception as e: - self.logger.exception(e) - self._client = None - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/adafruit/io/manifest.yaml b/platypush/backend/adafruit/io/manifest.yaml deleted file mode 100644 index e019ab9c..00000000 --- a/platypush/backend/adafruit/io/manifest.yaml +++ /dev/null @@ -1,12 +0,0 @@ -manifest: - events: - platypush.message.event.adafruit.ConnectedEvent: when thebackend connects to the - Adafruit queue - platypush.message.event.adafruit.DisconnectedEvent: when thebackend disconnects - from the Adafruit queue - platypush.message.event.adafruit.FeedUpdateEvent: when anupdate event is received - on a monitored feed - install: - pip: [] - package: platypush.backend.adafruit.io - type: backend diff --git a/platypush/message/event/adafruit.py b/platypush/message/event/adafruit.py index fea8e2a0..341c24fe 100644 --- a/platypush/message/event/adafruit.py +++ b/platypush/message/event/adafruit.py @@ -1,27 +1,28 @@ from platypush.message.event import Event -class ConnectedEvent(Event): +class AdafruitConnectedEvent(Event): """ - Event triggered when the backend connects to the Adafruit message queue + Event triggered when the backend connects to the Adafruit message queue. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) -class DisconnectedEvent(Event): +class AdafruitDisconnectedEvent(Event): """ - Event triggered when the backend disconnects from the Adafruit message queue + Event triggered when the backend disconnects from the Adafruit message + queue. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) -class FeedUpdateEvent(Event): +class AdafruitFeedUpdateEvent(Event): """ - Event triggered upon Adafruit IO feed update + Event triggered when a message is received on a subscribed Adafruit feed. """ def __init__(self, feed, data, *args, **kwargs): diff --git a/platypush/plugins/adafruit/io/__init__.py b/platypush/plugins/adafruit/io/__init__.py index 31dcbfae..23d08de3 100644 --- a/platypush/plugins/adafruit/io/__init__.py +++ b/platypush/plugins/adafruit/io/__init__.py @@ -1,26 +1,30 @@ -import ast import statistics -import json import time -from threading import Thread, Lock +from queue import Empty, Queue +from threading import Thread +from typing import Iterable, Optional, Union -from platypush.context import get_backend -from platypush.plugins import Plugin, action - -data_throttler_lock = None +from platypush.message.event.adafruit import ( + AdafruitConnectedEvent, + AdafruitDisconnectedEvent, + AdafruitFeedUpdateEvent, +) +from platypush.plugins import RunnablePlugin, action -class AdafruitIoPlugin(Plugin): +class AdafruitIoPlugin(RunnablePlugin): """ - This plugin allows you to interact with the Adafruit IO - , a cloud-based message queue and storage. - You can send values to feeds on your Adafruit IO account and read the - values of those feeds as well through any device. + This plugin allows you to interact with `Adafruit IO + `_, a cloud-based message queue and storage. You + can use this plugin to send and receive data to topics connected to your + Adafruit IO account. - Some example usages:: + Some example usages: - # Send the temperature value for a connected sensor to the "temperature" feed + .. code-block:: javascript + + // Send the temperature value for a connected sensor to the "temperature" feed { "type": "request", "action": "adafruit.io.send", @@ -30,7 +34,7 @@ class AdafruitIoPlugin(Plugin): } } - # Receive the most recent temperature value + // Receive the most recent temperature value { "type": "request", "action": "adafruit.io.receive", @@ -38,164 +42,194 @@ class AdafruitIoPlugin(Plugin): "feed": "temperature" } } + """ - _DATA_THROTTLER_QUEUE = 'platypush/adafruit.io' - - def __init__(self, username, key, throttle_seconds=None, **kwargs): + def __init__( + self, + username: str, + key: str, + feeds: Iterable[str] = (), + throttle_interval: Optional[float] = None, + **kwargs + ): """ :param username: Your Adafruit username - :type username: str - :param key: Your Adafruit IO key - :type key: str - - :param throttle_seconds: If set, then instead of sending the values directly over ``send`` the plugin will - first collect all the samples within the specified period and then dispatch them to Adafruit IO. - You may want to set it if you have data sources providing a lot of data points and you don't want to hit - the throttling limitations of Adafruit. - :type throttle_seconds: float + :param feeds: List of feeds to subscribe to. If not set, then the plugin + will not subscribe to any feed unless instructed to do so, neither + it will emit any event. + :param throttle_interval: If set, then instead of sending the values + directly over ``send`` the plugin will first collect all the + samples within the specified period and then dispatch them to + Adafruit IO. You may want to set it if you have data sources + providing a lot of data points and you don't want to hit the + throttling limitations of Adafruit. """ - from Adafruit_IO import Client + from Adafruit_IO import Client, MQTTClient - global data_throttler_lock super().__init__(**kwargs) self._username = username self._key = key + self.feeds = feeds self.aio = Client(username=username, key=key) - self.throttle_seconds = throttle_seconds + self._mqtt_client: Optional[MQTTClient] = None + self.throttle_interval = throttle_interval + self._data_throttler_thread: Optional[Thread] = None + self._data_throttler_queue = Queue() - if not data_throttler_lock: - data_throttler_lock = Lock() + @property + def _mqtt(self): + if not self._mqtt_client: + from Adafruit_IO import MQTTClient - if self.throttle_seconds and not data_throttler_lock.locked(): - self._get_redis() - self.logger.info('Starting Adafruit IO throttler thread') - data_throttler_lock.acquire(False) - self.data_throttler = Thread(target=self._data_throttler()) - self.data_throttler.start() + self._mqtt_client = MQTTClient(self._username, self._key) + self._mqtt_client.on_connect = self._on_connect # type: ignore + self._mqtt_client.on_disconnect = self._on_disconnect # type: ignore + self._mqtt_client.on_message = self._on_message # type: ignore - @staticmethod - def _get_redis(): - from redis import Redis + return self._mqtt_client - redis_args = { - 'host': 'localhost', - } + def _on_connect(self, *_, **__): + assert self._mqtt_client, 'MQTT client not initialized' + for feed in self.feeds: + self._mqtt_client.subscribe(feed) + self._bus.post(AdafruitConnectedEvent()) - redis_backend = get_backend('redis') - if redis_backend: - redis_args = get_backend('redis').redis_args - redis_args['socket_timeout'] = 1 - return Redis(**redis_args) + def _on_disconnect(self, *_, **__): + self._bus.post(AdafruitDisconnectedEvent()) + + def _on_message(self, _, feed, data, *__): + try: + data = float(data) + except (TypeError, ValueError): + pass + + self._bus.post(AdafruitFeedUpdateEvent(feed=feed, data=data)) + + def _subscribe(self, feed: str): + assert self._mqtt_client, 'MQTT client not initialized' + self._mqtt_client.subscribe(feed) + + def _unsubscribe(self, feed: str): + assert self._mqtt_client, 'MQTT client not initialized' + self._mqtt_client.unsubscribe(feed) def _data_throttler(self): - from redis.exceptions import TimeoutError as QueueTimeoutError + from Adafruit_IO import ThrottlingError - def run(): - from Adafruit_IO import ThrottlingError + if not self.throttle_interval: + return - redis = self._get_redis() - last_processed_batch_timestamp = None - data = {} + last_processed_batch_timestamp = None + data = {} - try: - while True: - try: - new_data = ast.literal_eval( - redis.blpop(self._DATA_THROTTLER_QUEUE)[1].decode('utf-8') - ) + try: + while not self.should_stop(): + try: + new_data = self._data_throttler_queue.get(timeout=1) + for key, value in new_data.items(): + data.setdefault(key, []).append(value) + except Empty: + continue - for (key, value) in new_data.items(): - data.setdefault(key, []).append(value) - except QueueTimeoutError: - pass + should_process = data and ( + last_processed_batch_timestamp is None + or time.time() - last_processed_batch_timestamp + >= self.throttle_interval + ) - if data and ( - last_processed_batch_timestamp is None - or time.time() - last_processed_batch_timestamp - >= self.throttle_seconds - ): - last_processed_batch_timestamp = time.time() - self.logger.info('Processing feeds batch for Adafruit IO') + if not should_process: + continue - for (feed, values) in data.items(): - if values: - value = statistics.mean(values) + last_processed_batch_timestamp = time.time() + self.logger.info('Processing feeds batch for Adafruit IO') - try: - self.send(feed, value, enqueue=False) - except ThrottlingError: - self.logger.warning( - 'Adafruit IO throttling threshold hit, taking a nap ' - + 'before retrying' - ) - time.sleep(self.throttle_seconds) + for feed, values in data.items(): + if values: + value = statistics.mean(values) - data = {} - except Exception as e: - self.logger.exception(e) + try: + self.send(feed, value, enqueue=False) + except ThrottlingError: + self.logger.warning( + 'Adafruit IO throttling threshold hit, taking a nap ' + + 'before retrying' + ) + self.wait_stop(self.throttle_interval) - return run + data = {} + except Exception as e: + self.logger.exception(e) @action - def send(self, feed, value, enqueue=True): + def send(self, feed: str, value: Union[int, float, str], enqueue: bool = True): """ - Send a value to an Adafruit IO feed + Send a value to an Adafruit IO feed. - :param feed: Feed name - :type feed: str - - :param value: Value to send - :type value: Numeric or string - - :param enqueue: If throttle_seconds is set, this method by default will append values to the throttling queue + :param feed: Feed name. + :param value: Value to send. + :param enqueue: If throttle_interval is set, this method by default will append values to the throttling queue to be periodically flushed instead of sending the message directly. In such case, pass enqueue=False to override the behaviour and send the message directly instead. - :type enqueue: bool """ - if not self.throttle_seconds or not enqueue: + if not self.throttle_interval or not enqueue: # If no throttling is configured, or enqueue is false then send the value directly to Adafruit self.aio.send(feed, value) else: - # Otherwise send it to the Redis queue to be picked up by the throttler thread - redis = self._get_redis() - redis.rpush(self._DATA_THROTTLER_QUEUE, json.dumps({feed: value})) + # Otherwise send it to the queue to be picked up by the throttler thread + self._data_throttler_queue.put({feed: value}) @action - def send_location_data(self, feed, lat, lon, ele, value): + def send_location_data( + self, + feed: str, + latitude: float, + longitude: float, + elevation: float, + value: Optional[Union[int, float, str]] = None, + ): """ Send location data to an Adafruit IO feed :param feed: Feed name - :type feed: str - :param lat: Latitude - :type lat: float - :param lon: Longitude - :type lon: float - :param ele: Elevation - :type ele: float - - :param value: Value to send - :type value: Numeric or string + :param value: Extra value to attach to the record """ self.aio.send_data( feed=feed, value=value, metadata={ - 'lat': lat, - 'lon': lon, - 'ele': ele, + 'lat': latitude, + 'lon': longitude, + 'ele': elevation, }, ) + @action + def subscribe(self, feed: str): + """ + Subscribe to a feed. + + :param feed: Feed name + """ + self._subscribe(feed) + + @action + def unsubscribe(self, feed: str): + """ + Unsubscribe from a feed. + + :param feed: Feed name + """ + self._unsubscribe(feed) + @classmethod def _cast_value(cls, value): try: @@ -220,17 +254,14 @@ class AdafruitIoPlugin(Plugin): ] @action - def receive(self, feed, limit=1): + def receive(self, feed: str, limit: int = 1): """ Receive data from the specified Adafruit IO feed :param feed: Feed name - :type feed: str - :param limit: Maximum number of data points to be returned. If None, all the values in the feed will be returned. Default: 1 (return most recent value) - :type limit: int """ if limit == 1: @@ -241,42 +272,84 @@ class AdafruitIoPlugin(Plugin): return values[:limit] if limit else values @action - def receive_next(self, feed): + def receive_next(self, feed: str): """ Receive the next unprocessed data point from a feed :param feed: Feed name - :type feed: str """ values = self._convert_data_to_dict(self.aio.receive_next(feed)) return values[0] if values else None @action - def receive_previous(self, feed): + def receive_previous(self, feed: str): """ Receive the last processed data point from a feed :param feed: Feed name - :type feed: str """ values = self._convert_data_to_dict(self.aio.receive_previous(feed)) return values[0] if values else None @action - def delete(self, feed, data_id): + def delete(self, feed: str, data_id: str): """ Delete a data point from a feed :param feed: Feed name - :type feed: str - :param data_id: Data point ID to remove - :type data_id: str """ self.aio.delete(feed, data_id) + def main(self): + if self.throttle_interval: + self._data_throttler_thread = Thread(target=self._data_throttler) + self._data_throttler_thread.start() + + try: + while not self.should_stop(): + cur_wait = 1 + max_wait = 60 + + try: + self._mqtt.connect() + cur_wait = 1 + self._mqtt.loop_blocking() + except Exception as e: + self.logger.warning( + 'Adafruit IO connection error: %s, retrying in %d seconds', + e, + cur_wait, + ) + + self.logger.exception(e) + self.wait_stop(cur_wait) + cur_wait = min(cur_wait * 2, max_wait) + finally: + self._stop_mqtt() + finally: + self._stop_data_throttler() + + def _stop_mqtt(self): + if self._mqtt_client: + self._mqtt_client.disconnect() + self._mqtt_client = None + + def _stop_data_throttler(self): + if self._data_throttler_thread: + self._data_throttler_thread.join() + self._data_throttler_thread = None + + def _stop(self): + self._stop_mqtt() + self._stop_data_throttler() + + def stop(self): + self._stop() + super().stop() + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/adafruit/io/manifest.yaml b/platypush/plugins/adafruit/io/manifest.yaml index b346fa3c..968826cc 100644 --- a/platypush/plugins/adafruit/io/manifest.yaml +++ b/platypush/plugins/adafruit/io/manifest.yaml @@ -1,5 +1,8 @@ manifest: - events: {} + events: + - platypush.message.event.adafruit.AdafruitConnectedEvent + - platypush.message.event.adafruit.AdafruitDisconnectedEvent + - platypush.message.event.adafruit.AdafruitFeedUpdateEvent install: pip: - adafruit-io