Added throttle support to Adafruit IO plugin

This commit is contained in:
Fabio Manganiello 2018-07-25 01:53:56 +02:00
parent 3ab77e32f4
commit db98e6e05a

View file

@ -1,5 +1,11 @@
import ast
import statistics
import time
from threading import Thread
from Adafruit_IO import Client from Adafruit_IO import Client
from platypush.context import get_backend
from platypush.message import Message from platypush.message import Message
from platypush.plugins import Plugin, action from platypush.plugins import Plugin, action
@ -13,6 +19,7 @@ class AdafruitIoPlugin(Plugin):
Requires: Requires:
* **adafruit-io** (``pip install adafruit-io``) * **adafruit-io** (``pip install adafruit-io``)
* Redis server running and Redis backend configured if you want to enable throttling
Some example usages:: Some example usages::
@ -36,21 +43,73 @@ class AdafruitIoPlugin(Plugin):
} }
""" """
def __init__(self, username, key, *args, **kwargs): _DATA_THROTTLER_QUEUE = 'platypush/adafruit.io'
def __init__(self, username, key, throttle_seconds=None, *args, **kwargs):
""" """
:param username: Your Adafruit username :param username: Your Adafruit username
:type username: str :type username: str
:param key: Your Adafruit IO key :param key: Your Adafruit IO key
:type key: str :type key: str
:param throttle_seconds: If set, then instead of sending the values directly over ``send`` the plugin will first collect all the samples within the specified period and then dispatch them to Adafruit IO. You may want to set it if you have data sources providing a lot of data points and you don't want to hit the throttling limitations of Adafruit.
:type throttle_seconds: float
""" """
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.aio = Client(username=username, key=key) self.aio = Client(username=username, key=key)
self.throttle_seconds = throttle_seconds
if self.throttle_seconds:
redis = self._get_redis()
self.logger.info('Starting Adafruit IO throttler thread')
self.data_throttler = Thread(target=self._data_throttler())
self.data_throttler.start()
def _get_redis(self):
from redis import Redis
redis_args = get_backend('redis').redis_args
redis_args['socket_timeout'] = 1
return Redis(**redis_args)
def _data_throttler(self):
from redis.exceptions import TimeoutError as QueueTimeoutError
def run():
redis = self._get_redis()
last_processed_batch_timestamp = None
data = {}
while True:
try:
new_data = ast.literal_eval(
redis.blpop(self._DATA_THROTTLER_QUEUE)[1].decode('utf-8'))
for (key, value) in new_data.items():
data.setdefault(key, []).append(value)
except QueueTimeoutError:
pass
if data and (last_processed_batch_timestamp is None or
time.time() - last_processed_batch_timestamp >= self.throttle_seconds):
self.logger.info('Processing feeds batch for Adafruit IO')
for (feed, values) in data.items():
if values:
value = statistics.mean(values)
self.send(feed, value, enqueue=False)
last_processed_batch_timestamp = time.time()
data = {}
return run
@action @action
def send(self, feed, value): def send(self, feed, value, enqueue=True):
""" """
Send a value to an Adafruit IO feed Send a value to an Adafruit IO feed
@ -59,9 +118,18 @@ class AdafruitIoPlugin(Plugin):
:param value: Value to send :param value: Value to send
:type value: Numeric or string :type value: Numeric or string
:param enqueue: If throttle_seconds is set, this method by default will append values to the throttling queue to be periodically flushed instead of sending the message directly. In such case, pass enqueue=False to override the behaviour and send the message directly instead.
:type enqueue: bool
""" """
self.aio.send(feed, value) if not self.throttle_seconds or not enqueue:
# If no throttling is configured, or enqueue is false then send the value directly to Adafruit
self.aio.send(feed, value)
else:
# Otherwise send it to the Redis queue to be picked up by the throttler thread
redis = self._get_redis()
redis.rpush(self._DATA_THROTTLER_QUEUE, {feed:value})
@action @action