diff --git a/platypush/backend/adafruit/__init__.py b/platypush/backend/adafruit/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/platypush/backend/adafruit/io.py b/platypush/backend/adafruit/io.py new file mode 100644 index 0000000000..10eb2a9af0 --- /dev/null +++ b/platypush/backend/adafruit/io.py @@ -0,0 +1,81 @@ +from platypush.backend import Backend +from platypush.context import get_plugin +from platypush.message.event.adafruit import ConnectedEvent, DisconnectedEvent, \ + FeedUpdateEvent + + +class AdafruitIoBackend(Backend): + """ + Backend that listens to messages received over the Adafruit IO message queue + + Triggers: + + * :class:`platypush.message.event.adafruit.ConnectedEvent` when the + backend connects to the Adafruit queue + * :class:`platypush.message.event.adafruit.DisconnectedEvent` when the + backend disconnects from the Adafruit queue + * :class:`platypush.message.event.adafruit.FeedUpdateEvent` when an + update event is received on a monitored feed + + Requires: + + * The :class:`platypush.plugins.adafruit.io.AdafruitIoPlugin` plugin to + be active and configured. + """ + + def __init__(self, feeds, *args, **kwargs): + """ + :param feeds: List of feed IDs to monitor + :type feeds: list[str] + """ + + super().__init__(*args, **kwargs) + self.feeds = feeds + self._client = None + + def _init_client(self): + if self._client: + return + + from Adafruit_IO import MQTTClient + plugin = get_plugin('adafruit.io') + if not plugin: + raise RuntimeError('Adafruit IO plugin not configured') + + self._client = MQTTClient(plugin._username, plugin._key) + self._client.on_connect = self.on_connect() + self._client.on_disconnect = self.on_disconnect() + self._client.on_message = self.on_message() + + def on_connect(self): + def _handler(client): + self.bus.post(ConnectedEvent()) + return _handler + + def on_disconnect(self): + def _handler(client): + self.bus.post(DisconnectedEvent()) + return _handler + + def on_message(self): + def _handler(client, feed, data): + self.bus.post(FeedUpdateEvent(feed=feed, data=data)) + return _handler + + def run(self): + super().run() + + self.logger.info(('Initialized Adafruit IO backend, listening on ' + + 'feeds {}').format(self.feeds)) + + while not self.should_stop(): + try: + self._init_client() + self._client.connect() + self._client.loop_blocking() + except Exception as e: + self.logger.exception(e) + self._client = None + + +# vim:sw=4:ts=4:et: diff --git a/platypush/message/event/adafruit.py b/platypush/message/event/adafruit.py new file mode 100644 index 0000000000..f4895c243f --- /dev/null +++ b/platypush/message/event/adafruit.py @@ -0,0 +1,31 @@ +from platypush.message.event import Event + + +class ConnectedEvent(Event): + """ + Event triggered when the backend connects to the Adafruit message queue + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, feed=feed, data=data, **kwargs) + + +class DisconnectedEvent(Event): + """ + Event triggered when the backend disconnects from the Adafruit message queue + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, feed=feed, data=data, **kwargs) + + +class FeedUpdateEvent(Event): + """ + Event triggered upon Adafruit IO feed update + """ + + def __init__(self, feed, data, *args, **kwargs): + super().__init__(*args, feed=feed, data=data, **kwargs) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/adafruit/io.py b/platypush/plugins/adafruit/io.py index 0ce56dc345..c54d61a378 100644 --- a/platypush/plugins/adafruit/io.py +++ b/platypush/plugins/adafruit/io.py @@ -62,6 +62,9 @@ class AdafruitIoPlugin(Plugin): """ super().__init__(*args, **kwargs) + + self._username = username + self._key = key self.aio = Client(username=username, key=key) self.throttle_seconds = throttle_seconds @@ -169,24 +172,43 @@ class AdafruitIoPlugin(Plugin): self.aio.send_location_data(feed=feed, value=value, lat=lat, lon=lon, ele=ele) @action - def receive(self, feed): + def receive(self, feed, limit=1): """ - Receive the most recent value from an Adafruit IO feed and returns it - as a scalar (string or number) + Receive data from the specified Adafruit IO feed :param feed: Feed name :type feed: str + + :param limit: Maximum number of data points to be returned. If None, + all the values in the feed will be returned. Default: 1 (return most + recent value) """ - value = self.aio.receive(feed).value + if limit == 1: + value = self.aio.receive(feed).value - try: - value = float(value) - except ValueError: - pass + try: value = float(value) + except ValueError: pass + return value - return value + values = [i.value for i in self.aio.data(feed)] + if limit: + return values[-limit:] + return values + + @action + def delete(self, feed, data_id): + """ + Delete a data point from a feed + + :param feed: Feed name + :type feed: str + + :param data_id: Data point ID to remove + :type data_id: int + """ + + self.aio.delete(feed, data_id) # vim:sw=4:ts=4:et: -