Refactored backends to be more robust by wrapping the core logic into a try-except logic with sleep and retry

This commit is contained in:
Fabio Manganiello 2018-10-25 19:46:13 +02:00
parent 0a7722d858
commit 6ce348365f
25 changed files with 265 additions and 259 deletions

View file

@ -7,6 +7,7 @@ import importlib
import logging import logging
import sys import sys
import threading import threading
import time
from threading import Thread from threading import Thread
@ -34,6 +35,7 @@ class Backend(Thread):
""" """
_default_response_timeout = 5 _default_response_timeout = 5
_backend_reload_timeout = 10
def __init__(self, bus=None, **kwargs): def __init__(self, bus=None, **kwargs):
""" """
@ -211,9 +213,38 @@ class Backend(Thread):
redis.send_message(msg, queue_name=queue_name) redis.send_message(msg, queue_name=queue_name)
def exec(self):
""" Backend thread logic. To be implemented in the derived classes """
raise RuntimeError('Backend class {} does not implement the exec() method'.
format(self.__class__.__name__))
def run(self): def run(self):
""" Starts the backend thread. To be implemented in the derived classes """ """ Thread runner. It wraps the exec method in a retry block """
super().run()
self.thread_id = threading.get_ident() self.thread_id = threading.get_ident()
error = None
while not self.should_stop():
try:
self.exec()
except Exception as e:
error = e
if not self.should_stop():
if error:
self.logger.error(('Backend {} terminated with an exception, ' +
'reloading in {} seconds').format(
self.__class__.__name__,
self._backend_reload_timeout))
self.logger.exception(error)
else:
self.logger.warning(('Backend {} unexpectedly terminated, ' +
'reloading in {} seconds').format(
self.__class__.__name__,
self._backend_reload_timeout))
time.sleep(self._backend_reload_timeout)
def on_stop(self): def on_stop(self):
""" Callback invoked when the process stops """ """ Callback invoked when the process stops """

View file

@ -110,8 +110,7 @@ class AssistantGoogleBackend(Backend):
self.assistant.stop_conversation() self.assistant.stop_conversation()
def run(self): def exec(self):
super().run()
with Assistant(self.credentials, self.device_model_id) as assistant: with Assistant(self.credentials, self.device_model_id) as assistant:
self.assistant = assistant self.assistant = assistant

View file

@ -176,9 +176,8 @@ class AssistantGooglePushtotalkBackend(Backend):
""" Speech recognized handler """ """ Speech recognized handler """
self.bus.post(SpeechRecognizedEvent(phrase=speech)) self.bus.post(SpeechRecognizedEvent(phrase=speech))
def run(self): def exec(self):
""" Backend executor """ """ Backend executor """
super().run()
with SampleAssistant(self.lang, self.device_model_id, self.device_id, with SampleAssistant(self.lang, self.device_model_id, self.device_id,
self.conversation_stream, self.conversation_stream,

View file

@ -72,8 +72,7 @@ class AssistantSnowboyBackend(Backend):
self.bus.post(HotwordDetectedEvent(hotword=self.hotword)) self.bus.post(HotwordDetectedEvent(hotword=self.hotword))
return callback return callback
def run(self): def exec(self):
super().run()
self.detector.start(self.hotword_detected()) self.detector.start(self.hotword_detected())

View file

@ -112,8 +112,7 @@ class ButtonFlicBackend(Backend):
return _f return _f
def run(self): def exec(self):
super().run()
self.client.handle_events() self.client.handle_events()

View file

@ -160,8 +160,7 @@ class CameraPiBackend(Backend):
self.logger.warning('Failed to stop recording') self.logger.warning('Failed to stop recording')
self.logger.exception(e) self.logger.exception(e)
def run(self): def exec(self):
super().run()
if not self.redis: if not self.redis:
self.redis = get_backend('redis') self.redis = get_backend('redis')

View file

@ -377,8 +377,7 @@ class HttpBackend(Backend):
loop.run_forever() loop.run_forever()
def run(self): def exec(self):
super().run()
os.putenv('FLASK_APP', 'platypush') os.putenv('FLASK_APP', 'platypush')
os.putenv('FLASK_ENV', 'production') os.putenv('FLASK_ENV', 'production')
self.logger.info('Initialized HTTP backend on port {}'.format(self.port)) self.logger.info('Initialized HTTP backend on port {}'.format(self.port))

View file

@ -72,8 +72,7 @@ class HttpPollBackend(Backend):
self.requests.append(request) self.requests.append(request)
def run(self): def exec(self):
super().run()
while not self.should_stop(): while not self.should_stop():
for request in self.requests: for request in self.requests:

View file

@ -48,8 +48,7 @@ class InotifyBackend(Backend):
self.inotify_watch = None self.inotify_watch = None
def run(self): def exec(self):
super().run()
self.inotify_watch = inotify.adapters.Inotify() self.inotify_watch = inotify.adapters.Inotify()
for path in self.watch_paths: for path in self.watch_paths:

View file

@ -29,8 +29,7 @@ class JoystickBackend(Backend):
self.device = device self.device = device
def run(self): def exec(self):
super().run()
self.logger.info('Initialized joystick backend on device {}'.format(self.device)) self.logger.info('Initialized joystick backend on device {}'.format(self.device))
while not self.should_stop(): while not self.should_stop():

View file

@ -81,8 +81,7 @@ class KafkaBackend(Backend):
self.logger.warning('Exception occurred while closing Kafka connection') self.logger.warning('Exception occurred while closing Kafka connection')
self.logger.exception(e) self.logger.exception(e)
def run(self): def exec(self):
super().run()
self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server) self.consumer = KafkaConsumer(self.topic, bootstrap_servers=self.server)
self.logger.info('Initialized kafka backend - server: {}, topic: {}' self.logger.info('Initialized kafka backend - server: {}, topic: {}'

View file

@ -61,8 +61,7 @@ class LocalBackend(Backend):
return Message.build(msg) if len(msg) else None return Message.build(msg) if len(msg) else None
def run(self): def exec(self):
super().run()
self.logger.info('Initialized local backend on {} and {}'. self.logger.info('Initialized local backend on {} and {}'.
format(self.request_fifo, self.response_fifo)) format(self.request_fifo, self.response_fifo))

View file

@ -102,8 +102,7 @@ class MidiBackend(Backend):
return callback return callback
def run(self): def exec(self):
super().run()
self.midi.open_port(self.port_number) self.midi.open_port(self.port_number)
self.logger.info('Initialized MIDI backend, listening for events on device {}'. self.logger.info('Initialized MIDI backend, listening for events on device {}'.

View file

@ -45,7 +45,7 @@ class MqttBackend(Backend):
publisher.single(self.topic, str(msg), hostname=self.host, port=self.port) publisher.single(self.topic, str(msg), hostname=self.host, port=self.port)
def run(self): def exec(self):
def on_connect(client, userdata, flags, rc): def on_connect(client, userdata, flags, rc):
client.subscribe(self.topic) client.subscribe(self.topic)
@ -57,7 +57,6 @@ class MqttBackend(Backend):
self.logger.info('Received message on the MQTT backend: {}'.format(msg)) self.logger.info('Received message on the MQTT backend: {}'.format(msg))
self.on_message(msg) self.on_message(msg)
super().run()
client = mqtt.Client() client = mqtt.Client()
client.on_connect = on_connect client.on_connect = on_connect
client.on_message = on_message client.on_message = on_message

View file

@ -39,8 +39,7 @@ class MusicMpdBackend(Backend):
self.poll_seconds = poll_seconds self.poll_seconds = poll_seconds
def run(self): def exec(self):
super().run()
last_status = {} last_status = {}
last_state = None last_state = None

View file

@ -175,8 +175,7 @@ class PushbulletBackend(Backend):
def on_stop(self): def on_stop(self):
self.ws.close() self.ws.close()
def run(self): def exec(self):
super().run()
self._init_socket() self._init_socket()
self.logger.info('Initialized Pushbullet backend - device_id: {}' self.logger.info('Initialized Pushbullet backend - device_id: {}'

View file

@ -60,8 +60,7 @@ class RedisBackend(Backend):
return msg return msg
def run(self): def exec(self):
super().run()
self.logger.info('Initialized Redis backend on queue {} with arguments {}'. self.logger.info('Initialized Redis backend on queue {} with arguments {}'.
format(self.queue, self.redis_args)) format(self.queue, self.redis_args))

View file

@ -50,8 +50,7 @@ class ScardBackend(Backend):
self.cardtype = AnyCardType() self.cardtype = AnyCardType()
def run(self): def exec(self):
super().run()
self.logger.info('Initialized smart card reader backend - ATR filter: {}'. self.logger.info('Initialized smart card reader backend - ATR filter: {}'.
format(self.ATRs)) format(self.ATRs))

View file

@ -40,8 +40,7 @@ class SensorBackend(Backend):
""" To be implemented in the derived classes """ """ To be implemented in the derived classes """
raise NotImplementedError('To be implemented in a derived class') raise NotImplementedError('To be implemented in a derived class')
def run(self): def exec(self):
super().run()
self.logger.info('Initialized {} sensor backend'.format(self.__class__.__name__)) self.logger.info('Initialized {} sensor backend'.format(self.__class__.__name__))
while not self.should_stop(): while not self.should_stop():

View file

@ -32,9 +32,7 @@ class SensorIrZeroborgBackend(Backend):
self.logger.info('Initialized Zeroborg infrared sensor backend') self.logger.info('Initialized Zeroborg infrared sensor backend')
def run(self): def exec(self):
super().run()
while True: while True:
try: try:
self.zb.GetIrMessage() self.zb.GetIrMessage()

View file

@ -61,9 +61,7 @@ class SensorLeapBackend(Backend):
self.position_tolerance = position_tolerance self.position_tolerance = position_tolerance
def run(self): def exec(self):
super().run()
listener = LeapListener(position_ranges=self.position_ranges, listener = LeapListener(position_ranges=self.position_ranges,
position_tolerance=self.position_tolerance) position_tolerance=self.position_tolerance)

View file

@ -90,9 +90,7 @@ class TcpBackend(Backend):
threading.Thread(target=_f_wrapper).run() threading.Thread(target=_f_wrapper).run()
def run(self): def exec(self):
super().run()
serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serv_sock.bind((self.bind_address, self.port)) serv_sock.bind((self.bind_address, self.port))

View file

@ -27,8 +27,7 @@ class WeatherForecastBackend(Backend):
def send_message(self, msg): def send_message(self, msg):
pass pass
def run(self): def exec(self):
super().run()
weather = get_plugin('weather.forecast') weather = get_plugin('weather.forecast')
self.logger.info('Initialized weather forecast backend') self.logger.info('Initialized weather forecast backend')