diff --git a/platypush/plugins/db/__init__.py b/platypush/plugins/db/__init__.py index 0e4b84a9a1..b9d9e959e8 100644 --- a/platypush/plugins/db/__init__.py +++ b/platypush/plugins/db/__init__.py @@ -105,7 +105,9 @@ class DbPlugin(Plugin): (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) """ - with self.get_engine(engine, *args, **kwargs).connect() as connection: + with self.get_engine( + engine, *args, **kwargs + ).connect() as connection, connection.begin(): connection.execute(text(statement)) def _get_table(self, table: str, *args, engine=None, **kwargs): @@ -324,7 +326,7 @@ class DbPlugin(Plugin): update_records = [] returned_records = [] - with engine.connect() as connection: + with engine.connect() as connection, connection.begin(): # Upsert case if key_columns: insert_records, update_records = self._get_new_and_existing_records( @@ -462,7 +464,7 @@ class DbPlugin(Plugin): } """ engine = self.get_engine(engine, *args, **kwargs) - with engine.connect() as connection: + with engine.connect() as connection, connection.begin(): table, engine = self._get_table(table, *args, engine=engine, **kwargs) return self._update(connection, table, records, key_columns) @@ -505,7 +507,7 @@ class DbPlugin(Plugin): engine = self.get_engine(engine, *args, **kwargs) - with engine.connect() as connection: + with engine.connect() as connection, connection.begin(): for record in records: table_, engine = self._get_table(table, *args, engine=engine, **kwargs) delete = table_.delete()