From 423fc492cf5b8619829481439ba125bb86d64772 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 25 Jul 2018 03:04:00 +0200 Subject: [PATCH] Guard the start of the data throttler thread with a lock to prevent multiple launches --- platypush/plugins/adafruit/io.py | 52 ++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/platypush/plugins/adafruit/io.py b/platypush/plugins/adafruit/io.py index 9700b1ccd1..0658033923 100644 --- a/platypush/plugins/adafruit/io.py +++ b/platypush/plugins/adafruit/io.py @@ -2,7 +2,7 @@ import ast import statistics import time -from threading import Thread +from threading import Thread, Lock from Adafruit_IO import Client from Adafruit_IO.errors import ThrottlingError @@ -10,6 +10,8 @@ from platypush.context import get_backend from platypush.message import Message from platypush.plugins import Plugin, action +data_throttler_lock = Lock() + class AdafruitIoPlugin(Plugin): """ This plugin allows you to interact with the Adafruit IO @@ -62,9 +64,10 @@ class AdafruitIoPlugin(Plugin): self.aio = Client(username=username, key=key) self.throttle_seconds = throttle_seconds - if self.throttle_seconds: + if self.throttle_seconds and not data_throttler_lock.locked(): redis = self._get_redis() self.logger.info('Starting Adafruit IO throttler thread') + data_throttler_lock.acquire(False) self.data_throttler = Thread(target=self._data_throttler()) self.data_throttler.start() @@ -85,32 +88,35 @@ class AdafruitIoPlugin(Plugin): last_processed_batch_timestamp = None data = {} - while True: - try: - new_data = ast.literal_eval( - redis.blpop(self._DATA_THROTTLER_QUEUE)[1].decode('utf-8')) + try: + while True: + try: + new_data = ast.literal_eval( + redis.blpop(self._DATA_THROTTLER_QUEUE)[1].decode('utf-8')) - for (key, value) in new_data.items(): - data.setdefault(key, []).append(value) - except QueueTimeoutError: - pass + for (key, value) in new_data.items(): + data.setdefault(key, []).append(value) + except QueueTimeoutError: + pass - if data and (last_processed_batch_timestamp is None or - time.time() - last_processed_batch_timestamp >= self.throttle_seconds): - last_processed_batch_timestamp = time.time() - self.logger.info('Processing feeds batch for Adafruit IO') + if data and (last_processed_batch_timestamp is None or + time.time() - last_processed_batch_timestamp >= self.throttle_seconds): + last_processed_batch_timestamp = time.time() + self.logger.info('Processing feeds batch for Adafruit IO') - for (feed, values) in data.items(): - if values: - value = statistics.mean(values) + for (feed, values) in data.items(): + if values: + value = statistics.mean(values) - try: - self.send(feed, value, enqueue=False) - except ThrottlingError: - self.logger.warning('Adafruit IO throttling threshold hit, taking a nap before retrying') - time.sleep(self.throttle_seconds) + try: + self.send(feed, value, enqueue=False) + except ThrottlingError: + self.logger.warning('Adafruit IO throttling threshold hit, taking a nap before retrying') + time.sleep(self.throttle_seconds) - data = {} + data = {} + except Exception as e: + self.logger.exception(e) return run