Guard the start of the data throttler thread with a lock to prevent multiple launches

This commit is contained in:
Fabio Manganiello 2018-07-25 03:04:00 +02:00
parent cd52128e44
commit 423fc492cf

View file

@ -2,7 +2,7 @@ import ast
import statistics import statistics
import time import time
from threading import Thread from threading import Thread, Lock
from Adafruit_IO import Client from Adafruit_IO import Client
from Adafruit_IO.errors import ThrottlingError from Adafruit_IO.errors import ThrottlingError
@ -10,6 +10,8 @@ from platypush.context import get_backend
from platypush.message import Message from platypush.message import Message
from platypush.plugins import Plugin, action from platypush.plugins import Plugin, action
data_throttler_lock = Lock()
class AdafruitIoPlugin(Plugin): class AdafruitIoPlugin(Plugin):
""" """
This plugin allows you to interact with the Adafruit IO This plugin allows you to interact with the Adafruit IO
@ -62,9 +64,10 @@ class AdafruitIoPlugin(Plugin):
self.aio = Client(username=username, key=key) self.aio = Client(username=username, key=key)
self.throttle_seconds = throttle_seconds self.throttle_seconds = throttle_seconds
if self.throttle_seconds: if self.throttle_seconds and not data_throttler_lock.locked():
redis = self._get_redis() redis = self._get_redis()
self.logger.info('Starting Adafruit IO throttler thread') self.logger.info('Starting Adafruit IO throttler thread')
data_throttler_lock.acquire(False)
self.data_throttler = Thread(target=self._data_throttler()) self.data_throttler = Thread(target=self._data_throttler())
self.data_throttler.start() self.data_throttler.start()
@ -85,32 +88,35 @@ class AdafruitIoPlugin(Plugin):
last_processed_batch_timestamp = None last_processed_batch_timestamp = None
data = {} data = {}
while True: try:
try: while True:
new_data = ast.literal_eval( try:
redis.blpop(self._DATA_THROTTLER_QUEUE)[1].decode('utf-8')) new_data = ast.literal_eval(
redis.blpop(self._DATA_THROTTLER_QUEUE)[1].decode('utf-8'))
for (key, value) in new_data.items(): for (key, value) in new_data.items():
data.setdefault(key, []).append(value) data.setdefault(key, []).append(value)
except QueueTimeoutError: except QueueTimeoutError:
pass pass
if data and (last_processed_batch_timestamp is None or if data and (last_processed_batch_timestamp is None or
time.time() - last_processed_batch_timestamp >= self.throttle_seconds): time.time() - last_processed_batch_timestamp >= self.throttle_seconds):
last_processed_batch_timestamp = time.time() last_processed_batch_timestamp = time.time()
self.logger.info('Processing feeds batch for Adafruit IO') self.logger.info('Processing feeds batch for Adafruit IO')
for (feed, values) in data.items(): for (feed, values) in data.items():
if values: if values:
value = statistics.mean(values) value = statistics.mean(values)
try: try:
self.send(feed, value, enqueue=False) self.send(feed, value, enqueue=False)
except ThrottlingError: except ThrottlingError:
self.logger.warning('Adafruit IO throttling threshold hit, taking a nap before retrying') self.logger.warning('Adafruit IO throttling threshold hit, taking a nap before retrying')
time.sleep(self.throttle_seconds) time.sleep(self.throttle_seconds)
data = {} data = {}
except Exception as e:
self.logger.exception(e)
return run return run