More robust logic to handle temporary database connection errors through retry mechanism

This commit is contained in:
Fabio Manganiello 2019-07-16 16:40:56 +02:00
parent 1ea737c15a
commit 4e8235a649

View file

@ -2,7 +2,10 @@
.. moduleauthor:: Fabio Manganiello <blacklight86@gmail.com>
"""
import time
from sqlalchemy import create_engine, Table, MetaData
from sqlalchemy.engine import Engine
from platypush.plugins import Plugin, action
@ -18,6 +21,8 @@ class DbPlugin(Plugin):
"""
engine = None
_db_error_wait_interval = 5.0
_db_error_retries = 3
def __init__(self, engine=None, *args, **kwargs):
"""
@ -32,9 +37,12 @@ class DbPlugin(Plugin):
def _get_engine(self, engine=None, *args, **kwargs):
if engine:
if isinstance(engine, Engine):
return engine
return create_engine(engine, *args, **kwargs)
else:
return self.engine
return self.engine
@staticmethod
def _build_condition(table, column, value):
@ -70,6 +78,34 @@ class DbPlugin(Plugin):
with engine.connect() as connection:
result = connection.execute(statement)
def _get_table(self, table, engine=None, *args, **kwargs):
if not engine:
engine = self._get_engine(engine, *args, **kwargs)
db_ok = False
n_tries = 0
last_error = None
while not db_ok and n_tries < self._db_error_retries:
try:
n_tries += 1
metadata = MetaData()
table = Table(table, metadata, autoload=True, autoload_with=engine)
db_ok = True
except Exception as e:
last_error = e
wait_time = self._db_error_wait_interval * n_tries
self.logger.exception(e)
self.logger.info('Waiting {} seconds before retrying'.format(wait_time))
time.sleep(wait_time)
engine = self._get_engine(engine, *args, **kwargs)
if not db_ok and last_error:
raise last_error
return table, engine
@action
def select(self, query=None, table=None, filter=None, engine=None, *args, **kwargs):
"""
@ -128,11 +164,10 @@ class DbPlugin(Plugin):
]
"""
db = self._get_engine(engine, *args, **kwargs)
engine = self._get_engine(engine, *args, **kwargs)
if table:
metadata = MetaData()
table = Table(table, metadata, autoload=True, autoload_with=db)
table, engine = self._get_table(table, engine=engine, *args, **kwargs)
query = table.select()
if filter:
@ -142,7 +177,7 @@ class DbPlugin(Plugin):
if query is None:
raise RuntimeError('You need to specify either "query", or "table" and "filter"')
with db.connect() as connection:
with engine.connect() as connection:
result = connection.execute(query)
columns = result.keys()
@ -201,15 +236,14 @@ class DbPlugin(Plugin):
if key_columns is None:
key_columns = []
db = self._get_engine(engine, *args, **kwargs)
engine = self._get_engine(engine, *args, **kwargs)
for record in records:
metadata = MetaData()
table = Table(table, metadata, autoload=True, autoload_with=db)
table, engine = self._get_table(table, engine=engine, *args, **kwargs)
insert = table.insert().values(**record)
try:
db.execute(insert)
engine.execute(insert)
except Exception as e:
if on_duplicate_update and key_columns:
self.update(table=table, records=records,
@ -264,8 +298,7 @@ class DbPlugin(Plugin):
engine = self._get_engine(engine, *args, **kwargs)
for record in records:
metadata = MetaData()
table = Table(table, metadata, autoload=True, autoload_with=engine)
table, engine = self._get_table(table, engine=engine, *args, **kwargs)
key = { k:v for (k,v) in record.items() if k in key_columns }
values = { k:v for (k,v) in record.items() if k not in key_columns }
@ -313,8 +346,7 @@ class DbPlugin(Plugin):
engine = self._get_engine(engine, *args, **kwargs)
for record in records:
metadata = MetaData()
table = Table(table, metadata, autoload=True, autoload_with=engine)
table, engine = self._get_table(table, engine=engine, *args, **kwargs)
delete = table.delete()
for (k,v) in record.items():