diff --git a/platypush/plugins/db/__init__.py b/platypush/plugins/db/__init__.py index 62f6b5c011..9e804578cf 100644 --- a/platypush/plugins/db/__init__.py +++ b/platypush/plugins/db/__init__.py @@ -2,7 +2,10 @@ .. moduleauthor:: Fabio Manganiello """ +import time + from sqlalchemy import create_engine, Table, MetaData +from sqlalchemy.engine import Engine from platypush.plugins import Plugin, action @@ -18,6 +21,8 @@ class DbPlugin(Plugin): """ engine = None + _db_error_wait_interval = 5.0 + _db_error_retries = 3 def __init__(self, engine=None, *args, **kwargs): """ @@ -32,9 +37,12 @@ class DbPlugin(Plugin): def _get_engine(self, engine=None, *args, **kwargs): if engine: + if isinstance(engine, Engine): + return engine + return create_engine(engine, *args, **kwargs) - else: - return self.engine + + return self.engine @staticmethod def _build_condition(table, column, value): @@ -70,6 +78,34 @@ class DbPlugin(Plugin): with engine.connect() as connection: result = connection.execute(statement) + def _get_table(self, table, engine=None, *args, **kwargs): + if not engine: + engine = self._get_engine(engine, *args, **kwargs) + + db_ok = False + n_tries = 0 + last_error = None + + while not db_ok and n_tries < self._db_error_retries: + try: + n_tries += 1 + metadata = MetaData() + table = Table(table, metadata, autoload=True, autoload_with=engine) + db_ok = True + except Exception as e: + last_error = e + wait_time = self._db_error_wait_interval * n_tries + self.logger.exception(e) + self.logger.info('Waiting {} seconds before retrying'.format(wait_time)) + time.sleep(wait_time) + engine = self._get_engine(engine, *args, **kwargs) + + if not db_ok and last_error: + raise last_error + + return table, engine + + @action def select(self, query=None, table=None, filter=None, engine=None, *args, **kwargs): """ @@ -128,11 +164,10 @@ class DbPlugin(Plugin): ] """ - db = self._get_engine(engine, *args, **kwargs) + engine = self._get_engine(engine, *args, **kwargs) if table: - metadata = MetaData() - table = Table(table, metadata, autoload=True, autoload_with=db) + table, engine = self._get_table(table, engine=engine, *args, **kwargs) query = table.select() if filter: @@ -142,7 +177,7 @@ class DbPlugin(Plugin): if query is None: raise RuntimeError('You need to specify either "query", or "table" and "filter"') - with db.connect() as connection: + with engine.connect() as connection: result = connection.execute(query) columns = result.keys() @@ -201,15 +236,14 @@ class DbPlugin(Plugin): if key_columns is None: key_columns = [] - db = self._get_engine(engine, *args, **kwargs) + engine = self._get_engine(engine, *args, **kwargs) for record in records: - metadata = MetaData() - table = Table(table, metadata, autoload=True, autoload_with=db) + table, engine = self._get_table(table, engine=engine, *args, **kwargs) insert = table.insert().values(**record) try: - db.execute(insert) + engine.execute(insert) except Exception as e: if on_duplicate_update and key_columns: self.update(table=table, records=records, @@ -264,8 +298,7 @@ class DbPlugin(Plugin): engine = self._get_engine(engine, *args, **kwargs) for record in records: - metadata = MetaData() - table = Table(table, metadata, autoload=True, autoload_with=engine) + table, engine = self._get_table(table, engine=engine, *args, **kwargs) key = { k:v for (k,v) in record.items() if k in key_columns } values = { k:v for (k,v) in record.items() if k not in key_columns } @@ -313,8 +346,7 @@ class DbPlugin(Plugin): engine = self._get_engine(engine, *args, **kwargs) for record in records: - metadata = MetaData() - table = Table(table, metadata, autoload=True, autoload_with=engine) + table, engine = self._get_table(table, engine=engine, *args, **kwargs) delete = table.delete() for (k,v) in record.items():