From 6ce348365f8a5b10e16508b17e825b36e24511fd Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 25 Oct 2018 19:46:13 +0200 Subject: [PATCH] Refactored backends to be more robust by wrapping the core logic into a try-except logic with sleep and retry --- platypush/backend/__init__.py | 33 ++- .../backend/assistant/google/__init__.py | 3 +- .../backend/assistant/google/pushtotalk.py | 3 +- .../backend/assistant/snowboy/__init__.py | 3 +- platypush/backend/button/flic/__init__.py | 3 +- .../backend/button/flic/fliclib/aioflic.py | 206 ++++++++--------- .../backend/button/flic/fliclib/fliclib.py | 216 +++++++++--------- platypush/backend/camera/pi.py | 3 +- platypush/backend/http/__init__.py | 3 +- platypush/backend/http/poll/__init__.py | 3 +- platypush/backend/inotify/__init__.py | 3 +- platypush/backend/joystick.py | 3 +- platypush/backend/kafka/__init__.py | 3 +- platypush/backend/local/__init__.py | 3 +- platypush/backend/midi.py | 3 +- platypush/backend/mqtt.py | 3 +- platypush/backend/music/mpd/__init__.py | 3 +- platypush/backend/pushbullet/__init__.py | 3 +- platypush/backend/redis.py | 3 +- platypush/backend/scard/__init__.py | 3 +- platypush/backend/sensor/__init__.py | 3 +- .../backend/sensor/ir/zeroborg/__init__.py | 4 +- platypush/backend/sensor/leap.py | 4 +- platypush/backend/tcp.py | 4 +- platypush/backend/weather/forecast.py | 3 +- 25 files changed, 265 insertions(+), 259 deletions(-) diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index 3aafa7ee6..346ea5dd6 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -7,6 +7,7 @@ import importlib import logging import sys import threading +import time from threading import Thread @@ -34,6 +35,7 @@ class Backend(Thread): """ _default_response_timeout = 5 + _backend_reload_timeout = 10 def __init__(self, bus=None, **kwargs): """ @@ -211,9 +213,38 @@ class Backend(Thread): redis.send_message(msg, queue_name=queue_name) + def exec(self): + """ Backend thread logic. To be implemented in the derived classes """ + raise RuntimeError('Backend class {} does not implement the exec() method'. + format(self.__class__.__name__)) + def run(self): - """ Starts the backend thread. To be implemented in the derived classes """ + """ Thread runner. It wraps the exec method in a retry block """ + + super().run() self.thread_id = threading.get_ident() + error = None + + while not self.should_stop(): + try: + self.exec() + except Exception as e: + error = e + + if not self.should_stop(): + if error: + self.logger.error(('Backend {} terminated with an exception, ' + + 'reloading in {} seconds').format( + self.__class__.__name__, + self._backend_reload_timeout)) + self.logger.exception(error) + else: + self.logger.warning(('Backend {} unexpectedly terminated, ' + + 'reloading in {} seconds').format( + self.__class__.__name__, + self._backend_reload_timeout)) + + time.sleep(self._backend_reload_timeout) def on_stop(self): """ Callback invoked when the process stops """ diff --git a/platypush/backend/assistant/google/__init__.py b/platypush/backend/assistant/google/__init__.py index 07bbc27b8..154654c33 100644 --- a/platypush/backend/assistant/google/__init__.py +++ b/platypush/backend/assistant/google/__init__.py @@ -110,8 +110,7 @@ class AssistantGoogleBackend(Backend): self.assistant.stop_conversation() - def run(self): - super().run() + def exec(self): with Assistant(self.credentials, self.device_model_id) as assistant: self.assistant = assistant diff --git a/platypush/backend/assistant/google/pushtotalk.py b/platypush/backend/assistant/google/pushtotalk.py index 74d298630..862a917d9 100644 --- a/platypush/backend/assistant/google/pushtotalk.py +++ b/platypush/backend/assistant/google/pushtotalk.py @@ -176,9 +176,8 @@ class AssistantGooglePushtotalkBackend(Backend): """ Speech recognized handler """ self.bus.post(SpeechRecognizedEvent(phrase=speech)) - def run(self): + def exec(self): """ Backend executor """ - super().run() with SampleAssistant(self.lang, self.device_model_id, self.device_id, self.conversation_stream, diff --git a/platypush/backend/assistant/snowboy/__init__.py b/platypush/backend/assistant/snowboy/__init__.py index 6d63e2265..3e95bb003 100644 --- a/platypush/backend/assistant/snowboy/__init__.py +++ b/platypush/backend/assistant/snowboy/__init__.py @@ -72,8 +72,7 @@ class AssistantSnowboyBackend(Backend): self.bus.post(HotwordDetectedEvent(hotword=self.hotword)) return callback - def run(self): - super().run() + def exec(self): self.detector.start(self.hotword_detected()) diff --git a/platypush/backend/button/flic/__init__.py b/platypush/backend/button/flic/__init__.py index bf9f1f670..5378c3ac0 100644 --- a/platypush/backend/button/flic/__init__.py +++ b/platypush/backend/button/flic/__init__.py @@ -112,8 +112,7 @@ class ButtonFlicBackend(Backend): return _f - def run(self): - super().run() + def exec(self): self.client.handle_events() diff --git a/platypush/backend/button/flic/fliclib/aioflic.py b/platypush/backend/button/flic/fliclib/aioflic.py index 31c9d3df5..70787ffaa 100644 --- a/platypush/backend/button/flic/fliclib/aioflic.py +++ b/platypush/backend/button/flic/fliclib/aioflic.py @@ -36,12 +36,12 @@ class RemovedReason(Enum): RemovedByThisClient = 0 ForceDisconnectedByThisClient = 1 ForceDisconnectedByOtherClient = 2 - + ButtonIsPrivate = 3 VerifyTimeout = 4 InternetBackendError = 5 InvalidData = 6 - + CouldntLoadDevice = 7 class ClickType(Enum): @@ -77,22 +77,22 @@ class ScanWizardResult(Enum): class ButtonScanner: """ButtonScanner class. - + Usage: scanner = ButtonScanner() scanner.on_advertisement_packet = lambda scanner, bd_addr, name, rssi, is_private, already_verified: ... client.add_scanner(scanner) """ - + _cnt = itertools.count() - + def __init__(self): self._scan_id = next(ButtonScanner._cnt) self.on_advertisement_packet = lambda scanner, bd_addr, name, rssi, is_private, already_verified: None class ScanWizard: """ScanWizard class - + Usage: wizard = ScanWizard() wizard.on_found_private_button = lambda scan_wizard: ... @@ -101,9 +101,9 @@ class ScanWizard: wizard.on_completed = lambda scan_wizard, result, bd_addr, name: ... client.add_scan_wizard(wizard) """ - + _cnt = itertools.count() - + def __init__(self): self._scan_wizard_id = next(ScanWizard._cnt) self._bd_addr = None @@ -115,31 +115,31 @@ class ScanWizard: class ButtonConnectionChannel: """ButtonConnectionChannel class. - + This class represents a connection channel to a Flic button. Add this button connection channel to a FlicClient by executing client.add_connection_channel(connection_channel). You may only have this connection channel added to one FlicClient at a time. - + Before you add the connection channel to the client, you should set up your callback functions by assigning the corresponding properties to this object with a function. Each callback function has a channel parameter as the first one, referencing this object. - + Available properties and the function parameters are: on_create_connection_channel_response: channel, error, connection_status on_removed: channel, removed_reason on_connection_status_changed: channel, connection_status, disconnect_reason on_button_up_or_down / on_button_click_or_hold / on_button_single_or_double_click / on_button_single_or_double_click_or_hold: channel, click_type, was_queued, time_diff """ - + _cnt = itertools.count() - + def __init__(self, bd_addr, latency_mode = LatencyMode.NormalLatency, auto_disconnect_time = 511): self._conn_id = next(ButtonConnectionChannel._cnt) self._bd_addr = bd_addr self._latency_mode = latency_mode self._auto_disconnect_time = auto_disconnect_time self._client = None - + self.on_create_connection_channel_response = lambda channel, error, connection_status: None self.on_removed = lambda channel, removed_reason: None self.on_connection_status_changed = lambda channel, connection_status, disconnect_reason: None @@ -147,21 +147,21 @@ class ButtonConnectionChannel: self.on_button_click_or_hold = lambda channel, click_type, was_queued, time_diff: None self.on_button_single_or_double_click = lambda channel, click_type, was_queued, time_diff: None self.on_button_single_or_double_click_or_hold = lambda channel, click_type, was_queued, time_diff: None - + @property def bd_addr(self): return self._bd_addr - + @property def latency_mode(self): return self._latency_mode - + @latency_mode.setter def latency_mode(self, latency_mode): if self._client is None: self._latency_mode = latency_mode return - + self._latency_mode = latency_mode if not self._client._closed: self._client._send_command("CmdChangeModeParameters", {"conn_id": self._conn_id, "latency_mode": self._latency_mode, "auto_disconnect_time": self._auto_disconnect_time}) @@ -169,39 +169,39 @@ class ButtonConnectionChannel: @property def auto_disconnect_time(self): return self._auto_disconnect_time - + @auto_disconnect_time.setter def auto_disconnect_time(self, auto_disconnect_time): if self._client is None: self._auto_disconnect_time = auto_disconnect_time return - + self._auto_disconnect_time = auto_disconnect_time if not self._client._closed: self._client._send_command("CmdChangeModeParameters", {"conn_id": self._conn_id, "latency_mode": self._latency_mode, "auto_disconnect_time": self._auto_disconnect_time}) class FlicClient(asyncio.Protocol): """FlicClient class. - + When this class is constructed, a socket connection is established. You may then send commands to the server and set timers. Once you are ready with the initialization you must call the handle_events() method which is a main loop that never exits, unless the socket is closed. For a more detailed description of all commands, events and enums, check the protocol specification. - + All commands are wrapped in more high level functions and events are reported using callback functions. - + All methods called on this class will take effect only if you eventually call the handle_events() method. - + The ButtonScanner is used to set up a handler for advertisement packets. The ButtonConnectionChannel is used to interact with connections to flic buttons and receive their events. - + Other events are handled by the following callback functions that can be assigned to this object (and a list of the callback function parameters): on_new_verified_button: bd_addr on_no_space_for_new_connection: max_concurrently_connected_buttons on_got_space_for_new_connection: max_concurrently_connected_buttons on_bluetooth_controller_state_change: state """ - + _EVENTS = [ ("EvtAdvertisementPacket", "= len(FlicClient._EVENTS) or FlicClient._EVENTS[opcode] == None: return - + event_name = FlicClient._EVENTS[opcode][0] data_tuple = FlicClient._EVENT_STRUCTS[opcode].unpack(data[1 : 1 + FlicClient._EVENT_STRUCTS[opcode].size]) items = FlicClient._EVENT_NAMED_TUPLES[opcode]._make(data_tuple)._asdict() - + # Process some kind of items whose data type is not supported by struct if "bd_addr" in items: items["bd_addr"] = FlicClient._bdaddr_bytes_to_string(items["bd_addr"]) - + if "name" in items: items["name"] = items["name"].decode("utf-8") - + if event_name == "EvtCreateConnectionChannelResponse": items["error"] = CreateConnectionChannelError(items["error"]) items["connection_status"] = ConnectionStatus(items["connection_status"]) - + if event_name == "EvtConnectionStatusChanged": items["connection_status"] = ConnectionStatus(items["connection_status"]) items["disconnect_reason"] = DisconnectReason(items["disconnect_reason"]) - + if event_name == "EvtConnectionChannelRemoved": items["removed_reason"] = RemovedReason(items["removed_reason"]) - + if event_name.startswith("EvtButton"): items["click_type"] = ClickType(items["click_type"]) - + if event_name == "EvtGetInfoResponse": items["bluetooth_controller_state"] = BluetoothControllerState(items["bluetooth_controller_state"]) items["my_bd_addr"] = FlicClient._bdaddr_bytes_to_string(items["my_bd_addr"]) items["my_bd_addr_type"] = BdAddrType(items["my_bd_addr_type"]) items["bd_addr_of_verified_buttons"] = [] - + pos = FlicClient._EVENT_STRUCTS[opcode].size for i in range(items["nb_verified_buttons"]): items["bd_addr_of_verified_buttons"].append(FlicClient._bdaddr_bytes_to_string(data[1 + pos : 1 + pos + 6])) pos += 6 - + if event_name == "EvtBluetoothControllerStateChange": items["state"] = BluetoothControllerState(items["state"]) - + if event_name == "EvtGetButtonUUIDResponse": items["uuid"] = "".join(map(lambda x: "%02x" % x, items["uuid"])) if items["uuid"] == "00000000000000000000000000000000": items["uuid"] = None - + if event_name == "EvtScanWizardCompleted": items["result"] = ScanWizardResult(items["result"]) - + # Process event if event_name == "EvtAdvertisementPacket": scanner = self._scanners.get(items["scan_id"]) if scanner is not None: scanner.on_advertisement_packet(scanner, items["bd_addr"], items["name"], items["rssi"], items["is_private"], items["already_verified"]) - + if event_name == "EvtCreateConnectionChannelResponse": channel = self._connection_channels[items["conn_id"]] if items["error"] != CreateConnectionChannelError.NoError: del self._connection_channels[items["conn_id"]] channel.on_create_connection_channel_response(channel, items["error"], items["connection_status"]) - + if event_name == "EvtConnectionStatusChanged": channel = self._connection_channels[items["conn_id"]] channel.on_connection_status_changed(channel, items["connection_status"], items["disconnect_reason"]) - + if event_name == "EvtConnectionChannelRemoved": channel = self._connection_channels[items["conn_id"]] del self._connection_channels[items["conn_id"]] channel.on_removed(channel, items["removed_reason"]) - + if event_name == "EvtButtonUpOrDown": channel = self._connection_channels[items["conn_id"]] channel.on_button_up_or_down(channel, items["click_type"], items["was_queued"], items["time_diff"]) @@ -498,45 +498,45 @@ class FlicClient(asyncio.Protocol): if event_name == "EvtButtonSingleOrDoubleClickOrHold": channel = self._connection_channels[items["conn_id"]] channel.on_button_single_or_double_click_or_hold(channel, items["click_type"], items["was_queued"], items["time_diff"]) - + if event_name == "EvtNewVerifiedButton": self.on_new_verified_button(items["bd_addr"]) - + if event_name == "EvtGetInfoResponse": self.on_get_info(items) - + if event_name == "EvtNoSpaceForNewConnection": self.on_no_space_for_new_connection(items["max_concurrently_connected_buttons"]) - + if event_name == "EvtGotSpaceForNewConnection": self.on_got_space_for_new_connection(items["max_concurrently_connected_buttons"]) - + if event_name == "EvtBluetoothControllerStateChange": self.on_bluetooth_controller_state_change(items["state"]) - + if event_name == "EvtGetButtonUUIDResponse": self.on_get_button_uuid(items["bd_addr"], items["uuid"]) - + if event_name == "EvtScanWizardFoundPrivateButton": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] scan_wizard.on_found_private_button(scan_wizard) - + if event_name == "EvtScanWizardFoundPublicButton": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] scan_wizard._bd_addr = items["bd_addr"] scan_wizard._name = items["name"] scan_wizard.on_found_public_button(scan_wizard, scan_wizard._bd_addr, scan_wizard._name) - + if event_name == "EvtScanWizardButtonConnected": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] scan_wizard.on_button_connected(scan_wizard, scan_wizard._bd_addr, scan_wizard._name) - + if event_name == "EvtScanWizardCompleted": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] del self._scan_wizards[items["scan_wizard_id"]] scan_wizard.on_completed(scan_wizard, items["result"], scan_wizard._bd_addr, scan_wizard._name) - - + + def data_received(self,data): cdata=self.buffer+data self.buffer=b"" @@ -550,5 +550,5 @@ class FlicClient(asyncio.Protocol): if len(cdata): self.buffer=cdata #unlikely to happen but..... break - + diff --git a/platypush/backend/button/flic/fliclib/fliclib.py b/platypush/backend/button/flic/fliclib/fliclib.py index 6e48811ff..2a0e7959f 100644 --- a/platypush/backend/button/flic/fliclib/fliclib.py +++ b/platypush/backend/button/flic/fliclib/fliclib.py @@ -40,12 +40,12 @@ class RemovedReason(Enum): RemovedByThisClient = 0 ForceDisconnectedByThisClient = 1 ForceDisconnectedByOtherClient = 2 - + ButtonIsPrivate = 3 VerifyTimeout = 4 InternetBackendError = 5 InvalidData = 6 - + CouldntLoadDevice = 7 class ClickType(Enum): @@ -81,22 +81,22 @@ class ScanWizardResult(Enum): class ButtonScanner: """ButtonScanner class. - + Usage: scanner = ButtonScanner() scanner.on_advertisement_packet = lambda scanner, bd_addr, name, rssi, is_private, already_verified: ... client.add_scanner(scanner) """ - + _cnt = itertools.count() - + def __init__(self): self._scan_id = next(ButtonScanner._cnt) self.on_advertisement_packet = lambda scanner, bd_addr, name, rssi, is_private, already_verified: None class ScanWizard: """ScanWizard class - + Usage: wizard = ScanWizard() wizard.on_found_private_button = lambda scan_wizard: ... @@ -105,9 +105,9 @@ class ScanWizard: wizard.on_completed = lambda scan_wizard, result, bd_addr, name: ... client.add_scan_wizard(wizard) """ - + _cnt = itertools.count() - + def __init__(self): self._scan_wizard_id = next(ScanWizard._cnt) self._bd_addr = None @@ -119,31 +119,31 @@ class ScanWizard: class ButtonConnectionChannel: """ButtonConnectionChannel class. - + This class represents a connection channel to a Flic button. Add this button connection channel to a FlicClient by executing client.add_connection_channel(connection_channel). You may only have this connection channel added to one FlicClient at a time. - + Before you add the connection channel to the client, you should set up your callback functions by assigning the corresponding properties to this object with a function. Each callback function has a channel parameter as the first one, referencing this object. - + Available properties and the function parameters are: on_create_connection_channel_response: channel, error, connection_status on_removed: channel, removed_reason on_connection_status_changed: channel, connection_status, disconnect_reason on_button_up_or_down / on_button_click_or_hold / on_button_single_or_double_click / on_button_single_or_double_click_or_hold: channel, click_type, was_queued, time_diff """ - + _cnt = itertools.count() - + def __init__(self, bd_addr, latency_mode = LatencyMode.NormalLatency, auto_disconnect_time = 511): self._conn_id = next(ButtonConnectionChannel._cnt) self._bd_addr = bd_addr self._latency_mode = latency_mode self._auto_disconnect_time = auto_disconnect_time self._client = None - + self.on_create_connection_channel_response = lambda channel, error, connection_status: None self.on_removed = lambda channel, removed_reason: None self.on_connection_status_changed = lambda channel, connection_status, disconnect_reason: None @@ -151,36 +151,36 @@ class ButtonConnectionChannel: self.on_button_click_or_hold = lambda channel, click_type, was_queued, time_diff: None self.on_button_single_or_double_click = lambda channel, click_type, was_queued, time_diff: None self.on_button_single_or_double_click_or_hold = lambda channel, click_type, was_queued, time_diff: None - + @property def bd_addr(self): return self._bd_addr - + @property def latency_mode(self): return self._latency_mode - + @latency_mode.setter def latency_mode(self, latency_mode): if self._client is None: self._latency_mode = latency_mode return - + with self._client._lock: self._latency_mode = latency_mode if not self._client._closed: self._client._send_command("CmdChangeModeParameters", {"conn_id": self._conn_id, "latency_mode": self._latency_mode, "auto_disconnect_time": self._auto_disconnect_time}) - + @property def auto_disconnect_time(self): return self._auto_disconnect_time - + @auto_disconnect_time.setter def auto_disconnect_time(self, auto_disconnect_time): if self._client is None: self._auto_disconnect_time = auto_disconnect_time return - + with self._client._lock: self._auto_disconnect_time = auto_disconnect_time if not self._client._closed: @@ -188,26 +188,26 @@ class ButtonConnectionChannel: class FlicClient: """FlicClient class. - + When this class is constructed, a socket connection is established. You may then send commands to the server and set timers. Once you are ready with the initialization you must call the handle_events() method which is a main loop that never exits, unless the socket is closed. For a more detailed description of all commands, events and enums, check the protocol specification. - + All commands are wrapped in more high level functions and events are reported using callback functions. - + All methods called on this class will take effect only if you eventually call the handle_events() method. - + The ButtonScanner is used to set up a handler for advertisement packets. The ButtonConnectionChannel is used to interact with connections to flic buttons and receive their events. - + Other events are handled by the following callback functions that can be assigned to this object (and a list of the callback function parameters): on_new_verified_button: bd_addr on_no_space_for_new_connection: max_concurrently_connected_buttons on_got_space_for_new_connection: max_concurrently_connected_buttons on_bluetooth_controller_state_change: state """ - + _EVENTS = [ ("EvtAdvertisementPacket", "= len(FlicClient._EVENTS) or FlicClient._EVENTS[opcode] == None: return - + event_name = FlicClient._EVENTS[opcode][0] data_tuple = FlicClient._EVENT_STRUCTS[opcode].unpack(data[1 : 1 + FlicClient._EVENT_STRUCTS[opcode].size]) items = FlicClient._EVENT_NAMED_TUPLES[opcode]._make(data_tuple)._asdict() - + # Process some kind of items whose data type is not supported by struct if "bd_addr" in items: items["bd_addr"] = FlicClient._bdaddr_bytes_to_string(items["bd_addr"]) - + if "name" in items: items["name"] = items["name"].decode("utf-8") - + if event_name == "EvtCreateConnectionChannelResponse": items["error"] = CreateConnectionChannelError(items["error"]) items["connection_status"] = ConnectionStatus(items["connection_status"]) - + if event_name == "EvtConnectionStatusChanged": items["connection_status"] = ConnectionStatus(items["connection_status"]) items["disconnect_reason"] = DisconnectReason(items["disconnect_reason"]) - + if event_name == "EvtConnectionChannelRemoved": items["removed_reason"] = RemovedReason(items["removed_reason"]) - + if event_name.startswith("EvtButton"): items["click_type"] = ClickType(items["click_type"]) - + if event_name == "EvtGetInfoResponse": items["bluetooth_controller_state"] = BluetoothControllerState(items["bluetooth_controller_state"]) items["my_bd_addr"] = FlicClient._bdaddr_bytes_to_string(items["my_bd_addr"]) items["my_bd_addr_type"] = BdAddrType(items["my_bd_addr_type"]) items["bd_addr_of_verified_buttons"] = [] - + pos = FlicClient._EVENT_STRUCTS[opcode].size for i in range(items["nb_verified_buttons"]): items["bd_addr_of_verified_buttons"].append(FlicClient._bdaddr_bytes_to_string(data[1 + pos : 1 + pos + 6])) pos += 6 - + if event_name == "EvtBluetoothControllerStateChange": items["state"] = BluetoothControllerState(items["state"]) - + if event_name == "EvtGetButtonUUIDResponse": items["uuid"] = "".join(map(lambda x: "%02x" % x, items["uuid"])) if items["uuid"] == "00000000000000000000000000000000": items["uuid"] = None - + if event_name == "EvtScanWizardCompleted": items["result"] = ScanWizardResult(items["result"]) - + # Process event if event_name == "EvtAdvertisementPacket": scanner = self._scanners.get(items["scan_id"]) if scanner is not None: scanner.on_advertisement_packet(scanner, items["bd_addr"], items["name"], items["rssi"], items["is_private"], items["already_verified"]) - + if event_name == "EvtCreateConnectionChannelResponse": channel = self._connection_channels[items["conn_id"]] if items["error"] != CreateConnectionChannelError.NoError: del self._connection_channels[items["conn_id"]] channel.on_create_connection_channel_response(channel, items["error"], items["connection_status"]) - + if event_name == "EvtConnectionStatusChanged": channel = self._connection_channels[items["conn_id"]] channel.on_connection_status_changed(channel, items["connection_status"], items["disconnect_reason"]) - + if event_name == "EvtConnectionChannelRemoved": channel = self._connection_channels[items["conn_id"]] del self._connection_channels[items["conn_id"]] channel.on_removed(channel, items["removed_reason"]) - + if event_name == "EvtButtonUpOrDown": channel = self._connection_channels[items["conn_id"]] channel.on_button_up_or_down(channel, items["click_type"], items["was_queued"], items["time_diff"]) @@ -521,44 +521,44 @@ class FlicClient: if event_name == "EvtButtonSingleOrDoubleClickOrHold": channel = self._connection_channels[items["conn_id"]] channel.on_button_single_or_double_click_or_hold(channel, items["click_type"], items["was_queued"], items["time_diff"]) - + if event_name == "EvtNewVerifiedButton": self.on_new_verified_button(items["bd_addr"]) - + if event_name == "EvtGetInfoResponse": self._get_info_response_queue.get()(items) - + if event_name == "EvtNoSpaceForNewConnection": self.on_no_space_for_new_connection(items["max_concurrently_connected_buttons"]) - + if event_name == "EvtGotSpaceForNewConnection": self.on_got_space_for_new_connection(items["max_concurrently_connected_buttons"]) - + if event_name == "EvtBluetoothControllerStateChange": self.on_bluetooth_controller_state_change(items["state"]) - + if event_name == "EvtGetButtonUUIDResponse": self._get_button_uuid_queue.get()(items["bd_addr"], items["uuid"]) - + if event_name == "EvtScanWizardFoundPrivateButton": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] scan_wizard.on_found_private_button(scan_wizard) - + if event_name == "EvtScanWizardFoundPublicButton": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] scan_wizard._bd_addr = items["bd_addr"] scan_wizard._name = items["name"] scan_wizard.on_found_public_button(scan_wizard, scan_wizard._bd_addr, scan_wizard._name) - + if event_name == "EvtScanWizardButtonConnected": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] scan_wizard.on_button_connected(scan_wizard, scan_wizard._bd_addr, scan_wizard._name) - + if event_name == "EvtScanWizardCompleted": scan_wizard = self._scan_wizards[items["scan_wizard_id"]] del self._scan_wizards[items["scan_wizard_id"]] scan_wizard.on_completed(scan_wizard, items["result"], scan_wizard._bd_addr, scan_wizard._name) - + def _handle_one_event(self): if len(self._timers.queue) > 0: current_timer = self._timers.queue[0] @@ -568,10 +568,10 @@ class FlicClient: return True if len(select.select([self._sock], [], [], timeout)[0]) == 0: return True - + len_arr = bytearray(2) view = memoryview(len_arr) - + toread = 2 while toread > 0: nbytes = self._sock.recv_into(view, toread) @@ -579,7 +579,7 @@ class FlicClient: return False view = view[nbytes:] toread -= nbytes - + packet_len = len_arr[0] | (len_arr[1] << 8) data = bytearray(packet_len) view = memoryview(data) @@ -590,13 +590,13 @@ class FlicClient: return False view = view[nbytes:] toread -= nbytes - + self._dispatch_event(data) return True - + def handle_events(self): """Start the main loop for this client. - + This method will not return until the socket has been closed. Once it has returned, any use of this FlicClient is illegal. """ diff --git a/platypush/backend/camera/pi.py b/platypush/backend/camera/pi.py index 7c419a015..87b56bd25 100644 --- a/platypush/backend/camera/pi.py +++ b/platypush/backend/camera/pi.py @@ -160,8 +160,7 @@ class CameraPiBackend(Backend): self.logger.warning('Failed to stop recording') self.logger.exception(e) - def run(self): - super().run() + def exec(self): if not self.redis: self.redis = get_backend('redis') diff --git a/platypush/backend/http/__init__.py b/platypush/backend/http/__init__.py index f7afa70b4..9eef5d0cb 100644 --- a/platypush/backend/http/__init__.py +++ b/platypush/backend/http/__init__.py @@ -377,8 +377,7 @@ class HttpBackend(Backend): loop.run_forever() - def run(self): - super().run() + def exec(self): os.putenv('FLASK_APP', 'platypush') os.putenv('FLASK_ENV', 'production') self.logger.info('Initialized HTTP backend on port {}'.format(self.port)) diff --git a/platypush/backend/http/poll/__init__.py b/platypush/backend/http/poll/__init__.py index 52dccea27..e8ed95dc3 100644 --- a/platypush/backend/http/poll/__init__.py +++ b/platypush/backend/http/poll/__init__.py @@ -72,8 +72,7 @@ class HttpPollBackend(Backend): self.requests.append(request) - def run(self): - super().run() + def exec(self): while not self.should_stop(): for request in self.requests: diff --git a/platypush/backend/inotify/__init__.py b/platypush/backend/inotify/__init__.py index 8dae146b1..6336858df 100644 --- a/platypush/backend/inotify/__init__.py +++ b/platypush/backend/inotify/__init__.py @@ -48,8 +48,7 @@ class InotifyBackend(Backend): self.inotify_watch = None - def run(self): - super().run() + def exec(self): self.inotify_watch = inotify.adapters.Inotify() for path in self.watch_paths: diff --git a/platypush/backend/joystick.py b/platypush/backend/joystick.py index f3041492f..d6554f447 100644 --- a/platypush/backend/joystick.py +++ b/platypush/backend/joystick.py @@ -29,8 +29,7 @@ class JoystickBackend(Backend): self.device = device - def run(self): - super().run() + def exec(self): self.logger.info('Initialized joystick backend on device {}'.format(self.device)) while not self.should_stop(): diff --git a/platypush/backend/kafka/__init__.py b/platypush/backend/kafka/__init__.py index bce8b0529..e8b00d520 100644 --- a/platypush/backend/kafka/__init__.py +++ b/platypush/backend/kafka/__init__.py @@ -81,8 +81,7 @@ class KafkaBackend(Backend): self.logger.warning('Exception occurred while closing Kafka connection') self.logger.exception(e) - def run(self): - super().run() + def exec(self): self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server) self.logger.info('Initialized kafka backend - server: {}, topic: {}' diff --git a/platypush/backend/local/__init__.py b/platypush/backend/local/__init__.py index 398ab5f12..19e8a8637 100644 --- a/platypush/backend/local/__init__.py +++ b/platypush/backend/local/__init__.py @@ -61,8 +61,7 @@ class LocalBackend(Backend): return Message.build(msg) if len(msg) else None - def run(self): - super().run() + def exec(self): self.logger.info('Initialized local backend on {} and {}'. format(self.request_fifo, self.response_fifo)) diff --git a/platypush/backend/midi.py b/platypush/backend/midi.py index e4f08630b..521472911 100644 --- a/platypush/backend/midi.py +++ b/platypush/backend/midi.py @@ -102,8 +102,7 @@ class MidiBackend(Backend): return callback - def run(self): - super().run() + def exec(self): self.midi.open_port(self.port_number) self.logger.info('Initialized MIDI backend, listening for events on device {}'. diff --git a/platypush/backend/mqtt.py b/platypush/backend/mqtt.py index 899897d4b..b2701fc91 100644 --- a/platypush/backend/mqtt.py +++ b/platypush/backend/mqtt.py @@ -45,7 +45,7 @@ class MqttBackend(Backend): publisher.single(self.topic, str(msg), hostname=self.host, port=self.port) - def run(self): + def exec(self): def on_connect(client, userdata, flags, rc): client.subscribe(self.topic) @@ -57,7 +57,6 @@ class MqttBackend(Backend): self.logger.info('Received message on the MQTT backend: {}'.format(msg)) self.on_message(msg) - super().run() client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message diff --git a/platypush/backend/music/mpd/__init__.py b/platypush/backend/music/mpd/__init__.py index 14f426192..64455348a 100644 --- a/platypush/backend/music/mpd/__init__.py +++ b/platypush/backend/music/mpd/__init__.py @@ -39,8 +39,7 @@ class MusicMpdBackend(Backend): self.poll_seconds = poll_seconds - def run(self): - super().run() + def exec(self): last_status = {} last_state = None diff --git a/platypush/backend/pushbullet/__init__.py b/platypush/backend/pushbullet/__init__.py index 251fbc752..91ff6a628 100644 --- a/platypush/backend/pushbullet/__init__.py +++ b/platypush/backend/pushbullet/__init__.py @@ -175,8 +175,7 @@ class PushbulletBackend(Backend): def on_stop(self): self.ws.close() - def run(self): - super().run() + def exec(self): self._init_socket() self.logger.info('Initialized Pushbullet backend - device_id: {}' diff --git a/platypush/backend/redis.py b/platypush/backend/redis.py index 352f92159..cdb2f2be3 100644 --- a/platypush/backend/redis.py +++ b/platypush/backend/redis.py @@ -60,8 +60,7 @@ class RedisBackend(Backend): return msg - def run(self): - super().run() + def exec(self): self.logger.info('Initialized Redis backend on queue {} with arguments {}'. format(self.queue, self.redis_args)) diff --git a/platypush/backend/scard/__init__.py b/platypush/backend/scard/__init__.py index 2dcf231ed..1ea3cb255 100644 --- a/platypush/backend/scard/__init__.py +++ b/platypush/backend/scard/__init__.py @@ -50,8 +50,7 @@ class ScardBackend(Backend): self.cardtype = AnyCardType() - def run(self): - super().run() + def exec(self): self.logger.info('Initialized smart card reader backend - ATR filter: {}'. format(self.ATRs)) diff --git a/platypush/backend/sensor/__init__.py b/platypush/backend/sensor/__init__.py index 1ce1f86f2..7d065a97c 100644 --- a/platypush/backend/sensor/__init__.py +++ b/platypush/backend/sensor/__init__.py @@ -40,8 +40,7 @@ class SensorBackend(Backend): """ To be implemented in the derived classes """ raise NotImplementedError('To be implemented in a derived class') - def run(self): - super().run() + def exec(self): self.logger.info('Initialized {} sensor backend'.format(self.__class__.__name__)) while not self.should_stop(): diff --git a/platypush/backend/sensor/ir/zeroborg/__init__.py b/platypush/backend/sensor/ir/zeroborg/__init__.py index a26af8b81..b17c5acaa 100644 --- a/platypush/backend/sensor/ir/zeroborg/__init__.py +++ b/platypush/backend/sensor/ir/zeroborg/__init__.py @@ -32,9 +32,7 @@ class SensorIrZeroborgBackend(Backend): self.logger.info('Initialized Zeroborg infrared sensor backend') - def run(self): - super().run() - + def exec(self): while True: try: self.zb.GetIrMessage() diff --git a/platypush/backend/sensor/leap.py b/platypush/backend/sensor/leap.py index 8caed2b5b..4475edd43 100644 --- a/platypush/backend/sensor/leap.py +++ b/platypush/backend/sensor/leap.py @@ -61,9 +61,7 @@ class SensorLeapBackend(Backend): self.position_tolerance = position_tolerance - def run(self): - super().run() - + def exec(self): listener = LeapListener(position_ranges=self.position_ranges, position_tolerance=self.position_tolerance) diff --git a/platypush/backend/tcp.py b/platypush/backend/tcp.py index 37faf63ec..69bd5ce71 100644 --- a/platypush/backend/tcp.py +++ b/platypush/backend/tcp.py @@ -90,9 +90,7 @@ class TcpBackend(Backend): threading.Thread(target=_f_wrapper).run() - def run(self): - super().run() - + def exec(self): serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serv_sock.bind((self.bind_address, self.port)) diff --git a/platypush/backend/weather/forecast.py b/platypush/backend/weather/forecast.py index f89faadb7..686e81a09 100644 --- a/platypush/backend/weather/forecast.py +++ b/platypush/backend/weather/forecast.py @@ -27,8 +27,7 @@ class WeatherForecastBackend(Backend): def send_message(self, msg): pass - def run(self): - super().run() + def exec(self): weather = get_plugin('weather.forecast') self.logger.info('Initialized weather forecast backend')