From 284e0638f86bff929eee74e751a22b25b7215d9c Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 25 Oct 2018 20:45:58 +0200 Subject: [PATCH] Always define msg so the function doesn't fail on exception Reverted the previous retry logic for backend - it didn't really work This reverts commit 4e0e4863a001968a0a3217b2b887769e84cc0726. This reverts commit 964c7b5cf08cc74fac551e641068ec1e05e6e0df. This reverts commit 6ce348365f8a5b10e16508b17e825b36e24511fd. --- platypush/backend/__init__.py | 33 +-- .../backend/assistant/google/__init__.py | 3 +- .../backend/assistant/google/pushtotalk.py | 3 +- .../backend/assistant/snowboy/__init__.py | 3 +- platypush/backend/button/flic/__init__.py | 3 +- .../backend/button/flic/fliclib/aioflic.py | 206 ++++++++--------- .../backend/button/flic/fliclib/fliclib.py | 216 +++++++++--------- platypush/backend/camera/pi.py | 3 +- platypush/backend/http/__init__.py | 3 +- platypush/backend/http/poll/__init__.py | 3 +- platypush/backend/inotify/__init__.py | 3 +- platypush/backend/joystick.py | 3 +- platypush/backend/kafka/__init__.py | 3 +- platypush/backend/local/__init__.py | 3 +- platypush/backend/midi.py | 3 +- platypush/backend/mqtt.py | 3 +- platypush/backend/music/mpd/__init__.py | 3 +- platypush/backend/pushbullet/__init__.py | 3 +- platypush/backend/redis.py | 13 +- platypush/backend/scard/__init__.py | 3 +- platypush/backend/sensor/__init__.py | 3 +- .../backend/sensor/ir/zeroborg/__init__.py | 4 +- platypush/backend/sensor/leap.py | 4 +- platypush/backend/tcp.py | 4 +- platypush/backend/weather/forecast.py | 3 +- platypush/bus/redis.py | 2 + platypush/plugins/adafruit/io.py | 2 +- platypush/plugins/light/hue/__init__.py | 2 +- 28 files changed, 267 insertions(+), 273 deletions(-) diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 346ea5dd66..3aafa7ee6d 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -7,7 +7,6 @@ import importlib import logging import sys import threading -import time from threading import Thread @@ -35,7 +34,6 @@ class Backend(Thread): """ _default_response_timeout = 5 - _backend_reload_timeout = 10 def __init__(self, bus=None, **kwargs): """ @@ -213,38 +211,9 @@ class Backend(Thread): redis.send_message(msg, queue_name=queue_name) - def exec(self): - """ Backend thread logic. To be implemented in the derived classes """ - raise RuntimeError('Backend class {} does not implement the exec() method'. - format(self.__class__.__name__)) - def run(self): - """ Thread runner. It wraps the exec method in a retry block """ - - super().run() + """ Starts the backend thread. To be implemented in the derived classes """ self.thread_id = threading.get_ident() - error = None - - while not self.should_stop(): - try: - self.exec() - except Exception as e: - error = e - - if not self.should_stop(): - if error: - self.logger.error(('Backend {} terminated with an exception, ' + - 'reloading in {} seconds').format( - self.__class__.__name__, - self._backend_reload_timeout)) - self.logger.exception(error) - else: - self.logger.warning(('Backend {} unexpectedly terminated, ' + - 'reloading in {} seconds').format( - self.__class__.__name__, - self._backend_reload_timeout)) - - time.sleep(self._backend_reload_timeout) def on_stop(self): """ Callback invoked when the process stops """ diff --git a/platypush/backend/assistant/google/__init__.py b/platypush/backend/assistant/google/__init__.py index 154654c33f..07bbc27b89 100644 --- a/platypush/backend/assistant/google/__init__.py +++ b/platypush/backend/assistant/google/__init__.py @@ -110,7 +110,8 @@ class AssistantGoogleBackend(Backend): self.assistant.stop_conversation() - def exec(self): + def run(self): + super().run() with Assistant(self.credentials, self.device_model_id) as assistant: self.assistant = assistant diff --git a/platypush/backend/assistant/google/pushtotalk.py b/platypush/backend/assistant/google/pushtotalk.py index 862a917d9f..74d298630f 100644 --- a/platypush/backend/assistant/google/pushtotalk.py +++ b/platypush/backend/assistant/google/pushtotalk.py @@ -176,8 +176,9 @@ class AssistantGooglePushtotalkBackend(Backend): """ Speech recognized handler """ self.bus.post(SpeechRecognizedEvent(phrase=speech)) - def exec(self): + def run(self): """ Backend executor """ + super().run() with SampleAssistant(self.lang, self.device_model_id, self.device_id, self.conversation_stream, diff --git a/platypush/backend/assistant/snowboy/__init__.py b/platypush/backend/assistant/snowboy/__init__.py index 3e95bb0034..6d63e2265b 100644 --- a/platypush/backend/assistant/snowboy/__init__.py +++ b/platypush/backend/assistant/snowboy/__init__.py @@ -72,7 +72,8 @@ class AssistantSnowboyBackend(Backend): self.bus.post(HotwordDetectedEvent(hotword=self.hotword)) return callback - def exec(self): + def run(self): + super().run() self.detector.start(self.hotword_detected()) diff --git a/platypush/backend/button/flic/__init__.py b/platypush/backend/button/flic/__init__.py index 5378c3ac04..bf9f1f670b 100644 --- a/platypush/backend/button/flic/__init__.py +++ b/platypush/backend/button/flic/__init__.py @@ -112,7 +112,8 @@ class ButtonFlicBackend(Backend): return _f - def exec(self): + def run(self): + super().run() self.client.handle_events() diff --git a/platypush/backend/button/flic/fliclib/aioflic.py b/platypush/backend/button/flic/fliclib/aioflic.py index 70787ffaa4..31c9d3df53 100644 --- a/platypush/backend/button/flic/fliclib/aioflic.py +++ b/platypush/backend/button/flic/fliclib/aioflic.py @@ -36,12 +36,12 @@ class RemovedReason(Enum): RemovedByThisClient = 0 ForceDisconnectedByThisClient = 1 ForceDisconnectedByOtherClient = 2 - + ButtonIsPrivate = 3 VerifyTimeout = 4 InternetBackendError = 5 InvalidData = 6 - + CouldntLoadDevice = 7 class ClickType(Enum): @@ -77,22 +77,22 @@ class ScanWizardResult(Enum): class ButtonScanner: """ButtonScanner class. - + Usage: scanner = ButtonScanner() scanner.on_advertisement_packet = lambda scanner, bd_addr, name, rssi, is_private, already_verified: ... client.add_scanner(scanner) """ - + _cnt = itertools.count() - + def __init__(self): self._scan_id = next(ButtonScanner._cnt) self.on_advertisement_packet = lambda scanner, bd_addr, name, rssi, is_private, already_verified: None class ScanWizard: """ScanWizard class - + Usage: wizard = ScanWizard() wizard.on_found_private_button = lambda scan_wizard: ... @@ -101,9 +101,9 @@ class ScanWizard: wizard.on_completed = lambda scan_wizard, result, bd_addr, name: ... client.add_scan_wizard(wizard) """ - + _cnt = itertools.count() - + def __init__(self): self._scan_wizard_id = next(ScanWizard._cnt) self._bd_addr = None @@ -115,31 +115,31 @@ class ScanWizard: class ButtonConnectionChannel: """ButtonConnectionChannel class. - + This class represents a connection channel to a Flic button. Add this button connection channel to a FlicClient by executing client.add_connection_channel(connection_channel). You may only have this connection channel added to one FlicClient at a time. - + Before you add the connection channel to the client, you should set up your callback functions by assigning the corresponding properties to this object with a function. Each callback function has a channel parameter as the first one, referencing this object. - + Available properties and the function parameters are: on_create_connection_channel_response: channel, error, connection_status on_removed: channel, removed_reason on_connection_status_changed: channel, connection_status, disconnect_reason on_button_up_or_down / on_button_click_or_hold / on_button_single_or_double_click / on_button_single_or_double_click_or_hold: channel, click_type, was_queued, time_diff """ - + _cnt = itertools.count() - + def __init__(self, bd_addr, latency_mode = LatencyMode.NormalLatency, auto_disconnect_time = 511): self._conn_id = next(ButtonConnectionChannel._cnt) self._bd_addr = bd_addr self._latency_mode = latency_mode self._auto_disconnect_time = auto_disconnect_time self._client = None - + self.on_create_connection_channel_response = lambda channel, error, connection_status: None self.on_removed = lambda channel, removed_reason: None self.on_connection_status_changed = lambda channel, connection_status, disconnect_reason: None @@ -147,21 +147,21 @@ class ButtonConnectionChannel: self.on_button_click_or_hold = lambda channel, click_type, was_queued, time_diff: None self.on_button_single_or_double_click = lambda channel, click_type, was_queued, time_diff: None self.on_button_single_or_double_click_or_hold = lambda channel, click_type, was_queued, time_diff: None - + @property def bd_addr(self): return self._bd_addr - + @property def latency_mode(self): return self._latency_mode - + @latency_mode.setter def latency_mode(self, latency_mode): if self._client is None: self._latency_mode = latency_mode return - + self._latency_mode = latency_mode if not self._client._closed: self._client._send_command("CmdChangeModeParameters", {"conn_id": self._conn_id, "latency_mode": self._latency_mode, "auto_disconnect_time": self._auto_disconnect_time}) @@ -169,39 +169,39 @@ class ButtonConnectionChannel: @property def auto_disconnect_time(self): return self._auto_disconnect_time - + @auto_disconnect_time.setter def auto_disconnect_time(self, auto_disconnect_time): if self._client is None: self._auto_disconnect_time = auto_disconnect_time return - + self._auto_disconnect_time = auto_disconnect_time if not self._client._closed: self._client._send_command("CmdChangeModeParameters", {"conn_id": self._conn_id, "latency_mode": self._latency_mode, "auto_disconnect_time": self._auto_disconnect_time}) class FlicClient(asyncio.Protocol): """FlicClient class. - + When this class is constructed, a socket connection is established. You may then send commands to the server and set timers. Once you are ready with the initialization you must call the handle_events() method which is a main loop that never exits, unless the socket is closed. For a more detailed description of all commands, events and enums, check the protocol specification. - + All commands are wrapped in more high level functions and events are reported using callback functions. - + All methods called on this class will take effect only if you eventually call the handle_events() method. - + The ButtonScanner is used to set up a handler for advertisement packets. The ButtonConnectionChannel is used to interact with connections to flic buttons and receive their events. - + Other events are handled by the following callback functions that can be assigned to this object (and a list of the callback function parameters): on_new_verified_button: bd_addr on_no_space_for_new_connection: max_concurrently_connected_buttons on_got_space_for_new_connection: max_concurrently_connected_buttons on_bluetooth_controller_state_change: state """ - + _EVENTS = [ ("EvtAdvertisementPacket", "= len(FlicClient._EVENTS) or FlicClient._EVENTS[opcode] == None: return - + event_name = FlicClient._EVENTS[opcode][0] data_tuple = FlicClient._EVENT_STRUCTS[opcode].unpack(data[1 : 1 + FlicClient._EVENT_STRUCTS[opcode].size]) items = FlicClient._EVENT_NAMED_TUPLES[opcode]._make(data_tuple)._asdict() - + # Process some kind of items whose data type is not supported by struct if "bd_addr" in items: items["bd_addr"] = FlicClient._bdaddr_bytes_to_string(items["bd_addr"]) - + if "name" in items: items["name"] = items["name"].decode("utf-8") - + if event_name == "EvtCreateConnectionChannelResponse": items["error"] = CreateConnectionChannelError(items["error"]) items["connection_status"] = ConnectionStatus(items["connection_status"]) - + if event_name == "EvtConnectionStatusChanged": items["connection_status"] = ConnectionStatus(items["connection_status"]) items["disconnect_reason"] = DisconnectReason(items["disconnect_reason"]) - + if event_name == "EvtConnectionChannelRemoved": items["removed_reason"] = RemovedReason(items["removed_reason"]) - + if event_name.startswith("EvtButton"): items["click_type"] = ClickType(items["click_type"]) - + if event_name == "EvtGetInfoResponse": items["bluetooth_controller_state"] = BluetoothControllerState(items["bluetooth_controller_state"]) items["my_bd_addr"] = FlicClient._bdaddr_bytes_to_string(items["my_bd_addr"]) items["my_bd_addr_type"] = BdAddrType(items["my_bd_addr_type"]) items["bd_addr_of_verified_buttons"] = [] - + pos = FlicClient._EVENT_STRUCTS[opcode].size for i in range(items["nb_verified_buttons"]): items["bd_addr_of_verified_buttons"].append(FlicClient._bdaddr_bytes_to_string(data[1 + pos : 1 + pos + 6])) pos += 6 - + if event_name == "EvtBluetoothControllerStateChange": items["state"] = BluetoothControllerState(items["state"]) - + if event_name == "EvtGetButtonUUIDResponse": items["uuid"] = "".join(map(lambda x: "%02x" % x, items["uuid"])) if items["uuid"] == "00000000000000000000000000000000": items["uuid"] = None - + if event_name == "EvtScanWizardCompleted": items["result"] = ScanWizardResult(items["result"]) - + # Process event if event_name == "EvtAdvertisementPacket": scanner = self._scanners.get(items["scan_id"]) if scanner is not None: scanner.on_advertisement_packet(scanner, items["bd_addr"], items["name"], items["rssi"], items["is_private"], items["already_verified"]) - + if event_name == "EvtCreateConnectionChannelResponse": channel = self._connection_channels[items["conn_id"]] if items["error"] != CreateConnectionChannelError.NoError: del self._connection_channels[items["conn_id"]] channel.on_create_connection_channel_response(channel, items["error"], items["connection_status"]) - + if event_name == "EvtConnectionStatusChanged": channel = self._connection_channels[items["conn_id"]] channel.on_connection_status_changed(channel, items["connection_status"], items["disconnect_reason"]) - + if event_name == "EvtConnectionChannelRemoved": channel = self._connection_channels[items["conn_id"]] del self._connection_channels[items["conn_id"]] channel.on_removed(channel, items["removed_reason"]) - + if event_name == "EvtButtonUpOrDown": channel = self._connection_channels[items["conn_id"]] channel.on_button_up_or_down(channel, items["click_type"], items["was_queued"], items["time_diff"]) @@ -498,45 +498,45 @@ class FlicClient(asyncio.Protocol): if event_name == "EvtButtonSingleOrDoubleClickOrHold": channel = self._connection_channels[items["conn_id"]] channel.on_button_single_or_double_click_or_hold(channel, items["click_type"], items["was_queued"], items["time_diff"]) - + if event_name == "EvtNewVerifiedButton": self.on_new_verified_button(items["bd_addr"]) - + if event_name == "EvtGetInfoResponse": self.on_get_info(items) - + if event_name == "EvtNoSpaceForNewConnection": self.on_no_space_for_new_connection(items["max_concurrently_connected_buttons"]) - + if event_name == "EvtGotSpaceForNewConnection": self.on_got_space_for_new_connection(items["max_concurrently_connected_buttons"]) - + if event_name == "EvtBluetoothControllerStateChange": self.on_bluetooth_controller_state_change(items["state"]) - + if event_name == "EvtGetButtonUUIDResponse": self.on_get_button_uuid(items["bd_addr"], items["uuid"]) - + if event_name == "EvtScanWizardFoundPrivateButton": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] scan_wizard.on_found_private_button(scan_wizard) - + if event_name == "EvtScanWizardFoundPublicButton": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] scan_wizard._bd_addr = items["bd_addr"] scan_wizard._name = items["name"] scan_wizard.on_found_public_button(scan_wizard, scan_wizard._bd_addr, scan_wizard._name) - + if event_name == "EvtScanWizardButtonConnected": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] scan_wizard.on_button_connected(scan_wizard, scan_wizard._bd_addr, scan_wizard._name) - + if event_name == "EvtScanWizardCompleted": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] del self._scan_wizards[items["scan_wizard_id"]] scan_wizard.on_completed(scan_wizard, items["result"], scan_wizard._bd_addr, scan_wizard._name) - - + + def data_received(self,data): cdata=self.buffer+data self.buffer=b"" @@ -550,5 +550,5 @@ class FlicClient(asyncio.Protocol): if len(cdata): self.buffer=cdata #unlikely to happen but..... break - + diff --git a/platypush/backend/button/flic/fliclib/fliclib.py b/platypush/backend/button/flic/fliclib/fliclib.py index 2a0e7959ff..6e48811ffa 100644 --- a/platypush/backend/button/flic/fliclib/fliclib.py +++ b/platypush/backend/button/flic/fliclib/fliclib.py @@ -40,12 +40,12 @@ class RemovedReason(Enum): RemovedByThisClient = 0 ForceDisconnectedByThisClient = 1 ForceDisconnectedByOtherClient = 2 - + ButtonIsPrivate = 3 VerifyTimeout = 4 InternetBackendError = 5 InvalidData = 6 - + CouldntLoadDevice = 7 class ClickType(Enum): @@ -81,22 +81,22 @@ class ScanWizardResult(Enum): class ButtonScanner: """ButtonScanner class. - + Usage: scanner = ButtonScanner() scanner.on_advertisement_packet = lambda scanner, bd_addr, name, rssi, is_private, already_verified: ... client.add_scanner(scanner) """ - + _cnt = itertools.count() - + def __init__(self): self._scan_id = next(ButtonScanner._cnt) self.on_advertisement_packet = lambda scanner, bd_addr, name, rssi, is_private, already_verified: None class ScanWizard: """ScanWizard class - + Usage: wizard = ScanWizard() wizard.on_found_private_button = lambda scan_wizard: ... @@ -105,9 +105,9 @@ class ScanWizard: wizard.on_completed = lambda scan_wizard, result, bd_addr, name: ... client.add_scan_wizard(wizard) """ - + _cnt = itertools.count() - + def __init__(self): self._scan_wizard_id = next(ScanWizard._cnt) self._bd_addr = None @@ -119,31 +119,31 @@ class ScanWizard: class ButtonConnectionChannel: """ButtonConnectionChannel class. - + This class represents a connection channel to a Flic button. Add this button connection channel to a FlicClient by executing client.add_connection_channel(connection_channel). You may only have this connection channel added to one FlicClient at a time. - + Before you add the connection channel to the client, you should set up your callback functions by assigning the corresponding properties to this object with a function. Each callback function has a channel parameter as the first one, referencing this object. - + Available properties and the function parameters are: on_create_connection_channel_response: channel, error, connection_status on_removed: channel, removed_reason on_connection_status_changed: channel, connection_status, disconnect_reason on_button_up_or_down / on_button_click_or_hold / on_button_single_or_double_click / on_button_single_or_double_click_or_hold: channel, click_type, was_queued, time_diff """ - + _cnt = itertools.count() - + def __init__(self, bd_addr, latency_mode = LatencyMode.NormalLatency, auto_disconnect_time = 511): self._conn_id = next(ButtonConnectionChannel._cnt) self._bd_addr = bd_addr self._latency_mode = latency_mode self._auto_disconnect_time = auto_disconnect_time self._client = None - + self.on_create_connection_channel_response = lambda channel, error, connection_status: None self.on_removed = lambda channel, removed_reason: None self.on_connection_status_changed = lambda channel, connection_status, disconnect_reason: None @@ -151,36 +151,36 @@ class ButtonConnectionChannel: self.on_button_click_or_hold = lambda channel, click_type, was_queued, time_diff: None self.on_button_single_or_double_click = lambda channel, click_type, was_queued, time_diff: None self.on_button_single_or_double_click_or_hold = lambda channel, click_type, was_queued, time_diff: None - + @property def bd_addr(self): return self._bd_addr - + @property def latency_mode(self): return self._latency_mode - + @latency_mode.setter def latency_mode(self, latency_mode): if self._client is None: self._latency_mode = latency_mode return - + with self._client._lock: self._latency_mode = latency_mode if not self._client._closed: self._client._send_command("CmdChangeModeParameters", {"conn_id": self._conn_id, "latency_mode": self._latency_mode, "auto_disconnect_time": self._auto_disconnect_time}) - + @property def auto_disconnect_time(self): return self._auto_disconnect_time - + @auto_disconnect_time.setter def auto_disconnect_time(self, auto_disconnect_time): if self._client is None: self._auto_disconnect_time = auto_disconnect_time return - + with self._client._lock: self._auto_disconnect_time = auto_disconnect_time if not self._client._closed: @@ -188,26 +188,26 @@ class ButtonConnectionChannel: class FlicClient: """FlicClient class. - + When this class is constructed, a socket connection is established. You may then send commands to the server and set timers. Once you are ready with the initialization you must call the handle_events() method which is a main loop that never exits, unless the socket is closed. For a more detailed description of all commands, events and enums, check the protocol specification. - + All commands are wrapped in more high level functions and events are reported using callback functions. - + All methods called on this class will take effect only if you eventually call the handle_events() method. - + The ButtonScanner is used to set up a handler for advertisement packets. The ButtonConnectionChannel is used to interact with connections to flic buttons and receive their events. - + Other events are handled by the following callback functions that can be assigned to this object (and a list of the callback function parameters): on_new_verified_button: bd_addr on_no_space_for_new_connection: max_concurrently_connected_buttons on_got_space_for_new_connection: max_concurrently_connected_buttons on_bluetooth_controller_state_change: state """ - + _EVENTS = [ ("EvtAdvertisementPacket", "= len(FlicClient._EVENTS) or FlicClient._EVENTS[opcode] == None: return - + event_name = FlicClient._EVENTS[opcode][0] data_tuple = FlicClient._EVENT_STRUCTS[opcode].unpack(data[1 : 1 + FlicClient._EVENT_STRUCTS[opcode].size]) items = FlicClient._EVENT_NAMED_TUPLES[opcode]._make(data_tuple)._asdict() - + # Process some kind of items whose data type is not supported by struct if "bd_addr" in items: items["bd_addr"] = FlicClient._bdaddr_bytes_to_string(items["bd_addr"]) - + if "name" in items: items["name"] = items["name"].decode("utf-8") - + if event_name == "EvtCreateConnectionChannelResponse": items["error"] = CreateConnectionChannelError(items["error"]) items["connection_status"] = ConnectionStatus(items["connection_status"]) - + if event_name == "EvtConnectionStatusChanged": items["connection_status"] = ConnectionStatus(items["connection_status"]) items["disconnect_reason"] = DisconnectReason(items["disconnect_reason"]) - + if event_name == "EvtConnectionChannelRemoved": items["removed_reason"] = RemovedReason(items["removed_reason"]) - + if event_name.startswith("EvtButton"): items["click_type"] = ClickType(items["click_type"]) - + if event_name == "EvtGetInfoResponse": items["bluetooth_controller_state"] = BluetoothControllerState(items["bluetooth_controller_state"]) items["my_bd_addr"] = FlicClient._bdaddr_bytes_to_string(items["my_bd_addr"]) items["my_bd_addr_type"] = BdAddrType(items["my_bd_addr_type"]) items["bd_addr_of_verified_buttons"] = [] - + pos = FlicClient._EVENT_STRUCTS[opcode].size for i in range(items["nb_verified_buttons"]): items["bd_addr_of_verified_buttons"].append(FlicClient._bdaddr_bytes_to_string(data[1 + pos : 1 + pos + 6])) pos += 6 - + if event_name == "EvtBluetoothControllerStateChange": items["state"] = BluetoothControllerState(items["state"]) - + if event_name == "EvtGetButtonUUIDResponse": items["uuid"] = "".join(map(lambda x: "%02x" % x, items["uuid"])) if items["uuid"] == "00000000000000000000000000000000": items["uuid"] = None - + if event_name == "EvtScanWizardCompleted": items["result"] = ScanWizardResult(items["result"]) - + # Process event if event_name == "EvtAdvertisementPacket": scanner = self._scanners.get(items["scan_id"]) if scanner is not None: scanner.on_advertisement_packet(scanner, items["bd_addr"], items["name"], items["rssi"], items["is_private"], items["already_verified"]) - + if event_name == "EvtCreateConnectionChannelResponse": channel = self._connection_channels[items["conn_id"]] if items["error"] != CreateConnectionChannelError.NoError: del self._connection_channels[items["conn_id"]] channel.on_create_connection_channel_response(channel, items["error"], items["connection_status"]) - + if event_name == "EvtConnectionStatusChanged": channel = self._connection_channels[items["conn_id"]] channel.on_connection_status_changed(channel, items["connection_status"], items["disconnect_reason"]) - + if event_name == "EvtConnectionChannelRemoved": channel = self._connection_channels[items["conn_id"]] del self._connection_channels[items["conn_id"]] channel.on_removed(channel, items["removed_reason"]) - + if event_name == "EvtButtonUpOrDown": channel = self._connection_channels[items["conn_id"]] channel.on_button_up_or_down(channel, items["click_type"], items["was_queued"], items["time_diff"]) @@ -521,44 +521,44 @@ class FlicClient: if event_name == "EvtButtonSingleOrDoubleClickOrHold": channel = self._connection_channels[items["conn_id"]] channel.on_button_single_or_double_click_or_hold(channel, items["click_type"], items["was_queued"], items["time_diff"]) - + if event_name == "EvtNewVerifiedButton": self.on_new_verified_button(items["bd_addr"]) - + if event_name == "EvtGetInfoResponse": self._get_info_response_queue.get()(items) - + if event_name == "EvtNoSpaceForNewConnection": self.on_no_space_for_new_connection(items["max_concurrently_connected_buttons"]) - + if event_name == "EvtGotSpaceForNewConnection": self.on_got_space_for_new_connection(items["max_concurrently_connected_buttons"]) - + if event_name == "EvtBluetoothControllerStateChange": self.on_bluetooth_controller_state_change(items["state"]) - + if event_name == "EvtGetButtonUUIDResponse": self._get_button_uuid_queue.get()(items["bd_addr"], items["uuid"]) - + if event_name == "EvtScanWizardFoundPrivateButton": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] scan_wizard.on_found_private_button(scan_wizard) - + if event_name == "EvtScanWizardFoundPublicButton": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] scan_wizard._bd_addr = items["bd_addr"] scan_wizard._name = items["name"] scan_wizard.on_found_public_button(scan_wizard, scan_wizard._bd_addr, scan_wizard._name) - + if event_name == "EvtScanWizardButtonConnected": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] scan_wizard.on_button_connected(scan_wizard, scan_wizard._bd_addr, scan_wizard._name) - + if event_name == "EvtScanWizardCompleted": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] del self._scan_wizards[items["scan_wizard_id"]] scan_wizard.on_completed(scan_wizard, items["result"], scan_wizard._bd_addr, scan_wizard._name) - + def _handle_one_event(self): if len(self._timers.queue) > 0: current_timer = self._timers.queue[0] @@ -568,10 +568,10 @@ class FlicClient: return True if len(select.select([self._sock], [], [], timeout)[0]) == 0: return True - + len_arr = bytearray(2) view = memoryview(len_arr) - + toread = 2 while toread > 0: nbytes = self._sock.recv_into(view, toread) @@ -579,7 +579,7 @@ class FlicClient: return False view = view[nbytes:] toread -= nbytes - + packet_len = len_arr[0] | (len_arr[1] << 8) data = bytearray(packet_len) view = memoryview(data) @@ -590,13 +590,13 @@ class FlicClient: return False view = view[nbytes:] toread -= nbytes - + self._dispatch_event(data) return True - + def handle_events(self): """Start the main loop for this client. - + This method will not return until the socket has been closed. Once it has returned, any use of this FlicClient is illegal. """ diff --git a/platypush/backend/camera/pi.py b/platypush/backend/camera/pi.py index 87b56bd257..7c419a015d 100644 --- a/platypush/backend/camera/pi.py +++ b/platypush/backend/camera/pi.py @@ -160,7 +160,8 @@ class CameraPiBackend(Backend): self.logger.warning('Failed to stop recording') self.logger.exception(e) - def exec(self): + def run(self): + super().run() if not self.redis: self.redis = get_backend('redis') diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index 9eef5d0cb0..f7afa70b45 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -377,7 +377,8 @@ class HttpBackend(Backend): loop.run_forever() - def exec(self): + def run(self): + super().run() os.putenv('FLASK_APP', 'platypush') os.putenv('FLASK_ENV', 'production') self.logger.info('Initialized HTTP backend on port {}'.format(self.port)) diff --git a/platypush/backend/http/poll/__init__.py b/platypush/backend/http/poll/__init__.py index e8ed95dc37..52dccea276 100644 --- a/platypush/backend/http/poll/__init__.py +++ b/platypush/backend/http/poll/__init__.py @@ -72,7 +72,8 @@ class HttpPollBackend(Backend): self.requests.append(request) - def exec(self): + def run(self): + super().run() while not self.should_stop(): for request in self.requests: diff --git a/platypush/backend/inotify/__init__.py b/platypush/backend/inotify/__init__.py index 6336858dfb..8dae146b1f 100644 --- a/platypush/backend/inotify/__init__.py +++ b/platypush/backend/inotify/__init__.py @@ -48,7 +48,8 @@ class InotifyBackend(Backend): self.inotify_watch = None - def exec(self): + def run(self): + super().run() self.inotify_watch = inotify.adapters.Inotify() for path in self.watch_paths: diff --git a/platypush/backend/joystick.py b/platypush/backend/joystick.py index d6554f4473..f3041492f8 100644 --- a/platypush/backend/joystick.py +++ b/platypush/backend/joystick.py @@ -29,7 +29,8 @@ class JoystickBackend(Backend): self.device = device - def exec(self): + def run(self): + super().run() self.logger.info('Initialized joystick backend on device {}'.format(self.device)) while not self.should_stop(): diff --git a/platypush/backend/kafka/__init__.py b/platypush/backend/kafka/__init__.py index e8b00d5205..bce8b0529f 100644 --- a/platypush/backend/kafka/__init__.py +++ b/platypush/backend/kafka/__init__.py @@ -81,7 +81,8 @@ class KafkaBackend(Backend): self.logger.warning('Exception occurred while closing Kafka connection') self.logger.exception(e) - def exec(self): + def run(self): + super().run() self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server) self.logger.info('Initialized kafka backend - server: {}, topic: {}' diff --git a/platypush/backend/local/__init__.py b/platypush/backend/local/__init__.py index 19e8a8637b..398ab5f12f 100644 --- a/platypush/backend/local/__init__.py +++ b/platypush/backend/local/__init__.py @@ -61,7 +61,8 @@ class LocalBackend(Backend): return Message.build(msg) if len(msg) else None - def exec(self): + def run(self): + super().run() self.logger.info('Initialized local backend on {} and {}'. format(self.request_fifo, self.response_fifo)) diff --git a/platypush/backend/midi.py b/platypush/backend/midi.py index 5214729110..e4f08630b3 100644 --- a/platypush/backend/midi.py +++ b/platypush/backend/midi.py @@ -102,7 +102,8 @@ class MidiBackend(Backend): return callback - def exec(self): + def run(self): + super().run() self.midi.open_port(self.port_number) self.logger.info('Initialized MIDI backend, listening for events on device {}'. diff --git a/platypush/backend/mqtt.py b/platypush/backend/mqtt.py index b2701fc910..899897d4b3 100644 --- a/platypush/backend/mqtt.py +++ b/platypush/backend/mqtt.py @@ -45,7 +45,7 @@ class MqttBackend(Backend): publisher.single(self.topic, str(msg), hostname=self.host, port=self.port) - def exec(self): + def run(self): def on_connect(client, userdata, flags, rc): client.subscribe(self.topic) @@ -57,6 +57,7 @@ class MqttBackend(Backend): self.logger.info('Received message on the MQTT backend: {}'.format(msg)) self.on_message(msg) + super().run() client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message diff --git a/platypush/backend/music/mpd/__init__.py b/platypush/backend/music/mpd/__init__.py index 64455348af..14f4261928 100644 --- a/platypush/backend/music/mpd/__init__.py +++ b/platypush/backend/music/mpd/__init__.py @@ -39,7 +39,8 @@ class MusicMpdBackend(Backend): self.poll_seconds = poll_seconds - def exec(self): + def run(self): + super().run() last_status = {} last_state = None diff --git a/platypush/backend/pushbullet/__init__.py b/platypush/backend/pushbullet/__init__.py index 91ff6a628e..251fbc7520 100644 --- a/platypush/backend/pushbullet/__init__.py +++ b/platypush/backend/pushbullet/__init__.py @@ -175,7 +175,8 @@ class PushbulletBackend(Backend): def on_stop(self): self.ws.close() - def exec(self): + def run(self): + super().run() self._init_socket() self.logger.info('Initialized Pushbullet backend - device_id: {}' diff --git a/platypush/backend/redis.py b/platypush/backend/redis.py index 3c335dddcc..352f921598 100644 --- a/platypush/backend/redis.py +++ b/platypush/backend/redis.py @@ -31,21 +31,19 @@ class RedisBackend(Backend): self.queue = queue self.redis_args = redis_args - self.redis = None + self.redis = Redis(**self.redis_args) - def _get_redis(self): - return Redis(**self.redis_args) def send_message(self, msg, queue_name=None): if queue_name: - self._get_redis().rpush(queue_name, msg) + self.redis.rpush(queue_name, msg) else: - self._get_redis().rpush(self.queue, msg) + self.redis.rpush(self.queue, msg) def get_message(self, queue_name=None): queue = queue_name or self.queue - msg = self._get_redis().blpop(queue)[1].decode('utf-8') + msg = self.redis.blpop(queue)[1].decode('utf-8') try: msg = Message.build(json.loads(msg)) @@ -62,7 +60,8 @@ class RedisBackend(Backend): return msg - def exec(self): + def run(self): + super().run() self.logger.info('Initialized Redis backend on queue {} with arguments {}'. format(self.queue, self.redis_args)) diff --git a/platypush/backend/scard/__init__.py b/platypush/backend/scard/__init__.py index 1ea3cb2559..2dcf231ed0 100644 --- a/platypush/backend/scard/__init__.py +++ b/platypush/backend/scard/__init__.py @@ -50,7 +50,8 @@ class ScardBackend(Backend): self.cardtype = AnyCardType() - def exec(self): + def run(self): + super().run() self.logger.info('Initialized smart card reader backend - ATR filter: {}'. format(self.ATRs)) diff --git a/platypush/backend/sensor/__init__.py b/platypush/backend/sensor/__init__.py index 7d065a97c3..1ce1f86f2e 100644 --- a/platypush/backend/sensor/__init__.py +++ b/platypush/backend/sensor/__init__.py @@ -40,7 +40,8 @@ class SensorBackend(Backend): """ To be implemented in the derived classes """ raise NotImplementedError('To be implemented in a derived class') - def exec(self): + def run(self): + super().run() self.logger.info('Initialized {} sensor backend'.format(self.__class__.__name__)) while not self.should_stop(): diff --git a/platypush/backend/sensor/ir/zeroborg/__init__.py b/platypush/backend/sensor/ir/zeroborg/__init__.py index b17c5acaaf..a26af8b81e 100644 --- a/platypush/backend/sensor/ir/zeroborg/__init__.py +++ b/platypush/backend/sensor/ir/zeroborg/__init__.py @@ -32,7 +32,9 @@ class SensorIrZeroborgBackend(Backend): self.logger.info('Initialized Zeroborg infrared sensor backend') - def exec(self): + def run(self): + super().run() + while True: try: self.zb.GetIrMessage() diff --git a/platypush/backend/sensor/leap.py b/platypush/backend/sensor/leap.py index 4475edd43b..8caed2b5bb 100644 --- a/platypush/backend/sensor/leap.py +++ b/platypush/backend/sensor/leap.py @@ -61,7 +61,9 @@ class SensorLeapBackend(Backend): self.position_tolerance = position_tolerance - def exec(self): + def run(self): + super().run() + listener = LeapListener(position_ranges=self.position_ranges, position_tolerance=self.position_tolerance) diff --git a/platypush/backend/tcp.py b/platypush/backend/tcp.py index 69bd5ce719..37faf63ec6 100644 --- a/platypush/backend/tcp.py +++ b/platypush/backend/tcp.py @@ -90,7 +90,9 @@ class TcpBackend(Backend): threading.Thread(target=_f_wrapper).run() - def exec(self): + def run(self): + super().run() + serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serv_sock.bind((self.bind_address, self.port)) diff --git a/platypush/backend/weather/forecast.py b/platypush/backend/weather/forecast.py index 686e81a092..f89faadb77 100644 --- a/platypush/backend/weather/forecast.py +++ b/platypush/backend/weather/forecast.py @@ -27,7 +27,8 @@ class WeatherForecastBackend(Backend): def send_message(self, msg): pass - def exec(self): + def run(self): + super().run() weather = get_plugin('weather.forecast') self.logger.info('Initialized weather forecast backend') diff --git a/platypush/bus/redis.py b/platypush/bus/redis.py index ccc5fd00cb..cc11cac3a3 100644 --- a/platypush/bus/redis.py +++ b/platypush/bus/redis.py @@ -24,6 +24,8 @@ class RedisBus(Bus): def get(self): """ Reads one message from the Redis queue """ + msg = None + try: msg = self.redis.blpop(self.redis_queue) if msg and msg[1]: diff --git a/platypush/plugins/adafruit/io.py b/platypush/plugins/adafruit/io.py index 8c98840e76..0a5b8d1ad6 100644 --- a/platypush/plugins/adafruit/io.py +++ b/platypush/plugins/adafruit/io.py @@ -75,7 +75,7 @@ class AdafruitIoPlugin(Plugin): def _get_redis(self): from redis import Redis - redis_args = get_backend('redis').redis_args.copy() + redis_args = get_backend('redis').redis_args redis_args['socket_timeout'] = 1 return Redis(**redis_args) diff --git a/platypush/plugins/light/hue/__init__.py b/platypush/plugins/light/hue/__init__.py index b67142cbe4..8536e70d85 100644 --- a/platypush/plugins/light/hue/__init__.py +++ b/platypush/plugins/light/hue/__init__.py @@ -706,7 +706,7 @@ class LightHuePlugin(LightPlugin): self.animation_thread = None self.redis = None - redis_args = get_backend('redis').redis_args.copy() + redis_args = get_backend('redis').redis_args redis_args['socket_timeout'] = transition_seconds self.redis = Redis(**redis_args)