diff --git a/platypush/plugins/db/__init__.py b/platypush/plugins/db/__init__.py index f4594f089..8cb0f89bb 100644 --- a/platypush/plugins/db/__init__.py +++ b/platypush/plugins/db/__init__.py @@ -3,9 +3,11 @@ """ import time +from typing import Optional from sqlalchemy import create_engine, Table, MetaData from sqlalchemy.engine import Engine +from sqlalchemy.sql import text from platypush.plugins import Plugin, action @@ -23,10 +25,17 @@ class DbPlugin(Plugin): def __init__(self, engine=None, *args, **kwargs): """ - :param engine: Default SQLAlchemy connection engine string (e.g. ``sqlite:///:memory:`` or ``mysql://user:pass@localhost/test``) that will be used. You can override the default engine in the db actions. + :param engine: Default SQLAlchemy connection engine string (e.g. + ``sqlite:///:memory:`` or ``mysql://user:pass@localhost/test``) + that will be used. You can override the default engine in the db + actions. :type engine: str - :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html) - :param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html) + :param args: Extra arguments that will be passed to + ``sqlalchemy.create_engine`` (see + https://docs.sqlalchemy.org/en/latest/core/engines.html) + :param kwargs: Extra kwargs that will be passed to + ``sqlalchemy.create_engine`` + (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) """ super().__init__() @@ -41,11 +50,11 @@ class DbPlugin(Plugin): return create_engine(engine, *args, **kwargs) + assert self.engine return self.engine - # noinspection PyUnusedLocal @staticmethod - def _build_condition(table, column, value): + def _build_condition(table, column, value): # type: ignore if isinstance(value, str): value = "'{}'".format(value) elif not isinstance(value, int) and not isinstance(value, float): @@ -69,8 +78,12 @@ class DbPlugin(Plugin): :type statement: str :param engine: Engine to be used (default: default class engine) :type engine: str - :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html) - :param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html) + :param args: Extra arguments that will be passed to + ``sqlalchemy.create_engine`` (see + https://docs.sqlalchemy.org/en/latest/core/engines.html) + :param kwargs: Extra kwargs that will be passed to + ``sqlalchemy.create_engine`` + (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) """ engine = self._get_engine(engine, *args, **kwargs) @@ -106,24 +119,42 @@ class DbPlugin(Plugin): return table, engine @action - def select(self, query=None, table=None, filter=None, engine=None, *args, **kwargs): + def select( + self, + query=None, + table=None, + filter=None, + engine=None, + data: Optional[dict] = None, + *args, + **kwargs + ): """ Returns rows (as a list of hashes) given a query. :param query: SQL to be executed :type query: str - :param filter: Query WHERE filter expressed as a dictionary. This approach is preferred over specifying raw SQL - in ``query`` as the latter approach may be prone to SQL injection, unless you need to build some complex - SQL logic. + :param filter: Query WHERE filter expressed as a dictionary. This + approach is preferred over specifying raw SQL + in ``query`` as the latter approach may be prone to SQL injection, + unless you need to build some complex SQL logic. :type filter: dict - :param table: If you specified a filter instead of a raw query, you'll have to specify the target table + :param table: If you specified a filter instead of a raw query, you'll + have to specify the target table :type table: str :param engine: Engine to be used (default: default class engine) :type engine: str - :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` - (see https://docs.sqlalchemy.org/en/latest/core/engines.html) - :param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` - (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html) + :param data: If ``query`` is an SQL string, then you can use + SQLAlchemy's *placeholders* mechanism. You can specify placeholders + in the query for values that you want to be safely serialized, and + their values can be specified on the ``data`` attribute in a + ``name`` ➡️ ``value`` mapping format. + :param args: Extra arguments that will be passed to + ``sqlalchemy.create_engine`` (see + https://docs.sqlalchemy.org/en/latest/core/engines.html) + :param kwargs: Extra kwargs that will be passed to + ``sqlalchemy.create_engine`` (see + https:///docs.sqlalchemy.org/en/latest/core/engines.html) :returns: List of hashes representing the result rows. Examples: @@ -136,7 +167,10 @@ class DbPlugin(Plugin): "action": "db.select", "args": { "engine": "sqlite:///:memory:", - "query": "SELECT id, name FROM table" + "query": "SELECT id, name FROM table WHERE name = :name", + "data": { + "name": "foobar" + } } } @@ -165,19 +199,24 @@ class DbPlugin(Plugin): engine = self._get_engine(engine, *args, **kwargs) + if isinstance(query, str): + query = text(query) + if table: table, engine = self._get_table(table, engine=engine, *args, **kwargs) query = table.select() if filter: - for (k,v) in filter.items(): + for (k, v) in filter.items(): query = query.where(self._build_condition(table, k, v)) if query is None: - raise RuntimeError('You need to specify either "query", or "table" and "filter"') + raise RuntimeError( + 'You need to specify either "query", or "table" and "filter"' + ) with engine.connect() as connection: - result = connection.execute(query) + result = connection.execute(query, **(data or {})) columns = result.keys() rows = [ {col: row[i] for i, col in enumerate(list(columns))} @@ -187,8 +226,16 @@ class DbPlugin(Plugin): return rows @action - def insert(self, table, records, engine=None, key_columns=None, - on_duplicate_update=False, *args, **kwargs): + def insert( + self, + table, + records, + engine=None, + key_columns=None, + on_duplicate_update=False, + *args, + **kwargs + ): """ Inserts records (as a list of hashes) into a table. @@ -198,12 +245,20 @@ class DbPlugin(Plugin): :type records: list :param engine: Engine to be used (default: default class engine) :type engine: str - :param key_columns: Set it to specify the names of the key columns for ``table``. Set it if you want your statement to be executed with the ``on_duplicate_update`` flag. + :param key_columns: Set it to specify the names of the key columns for + ``table``. Set it if you want your statement to be executed with + the ``on_duplicate_update`` flag. :type key_columns: list - :param on_duplicate_update: If set, update the records in case of duplicate rows (default: False). If set, you'll need to specify ``key_columns`` as well. + :param on_duplicate_update: If set, update the records in case of + duplicate rows (default: False). If set, you'll need to specify + ``key_columns`` as well. :type on_duplicate_update: bool - :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html) - :param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html) + :param args: Extra arguments that will be passed to + ``sqlalchemy.create_engine`` (see + https://docs.sqlalchemy.org/en/latest/core/engines.html) + :param kwargs: Extra kwargs that will be passed to + ``sqlalchemy.create_engine`` + (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) Example: @@ -238,15 +293,21 @@ class DbPlugin(Plugin): for record in records: table, engine = self._get_table(table, engine=engine, *args, **kwargs) + insert = table.insert().values(**record) try: engine.execute(insert) except Exception as e: if on_duplicate_update and key_columns: - self.update(table=table, records=records, - key_columns=key_columns, engine=engine, - *args, **kwargs) + self.update( + table=table, + records=records, + key_columns=key_columns, + engine=engine, + *args, + **kwargs + ) else: raise e @@ -263,8 +324,12 @@ class DbPlugin(Plugin): :type key_columns: list :param engine: Engine to be used (default: default class engine) :type engine: str - :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html) - :param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html) + :param args: Extra arguments that will be passed to + ``sqlalchemy.create_engine`` (see + https://docs.sqlalchemy.org/en/latest/core/engines.html) + :param kwargs: Extra kwargs that will be passed to + ``sqlalchemy.create_engine`` + (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) Example: @@ -297,12 +362,12 @@ class DbPlugin(Plugin): for record in records: table, engine = self._get_table(table, engine=engine, *args, **kwargs) - key = { k:v for (k,v) in record.items() if k in key_columns } - values = { k:v for (k,v) in record.items() if k not in key_columns } + key = {k: v for (k, v) in record.items() if k in key_columns} + values = {k: v for (k, v) in record.items() if k not in key_columns} update = table.update() - for (k,v) in key.items(): + for (k, v) in key.items(): update = update.where(self._build_condition(table, k, v)) update = update.values(**values) @@ -319,8 +384,12 @@ class DbPlugin(Plugin): :type records: list :param engine: Engine to be used (default: default class engine) :type engine: str - :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html) - :param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html) + :param args: Extra arguments that will be passed to + ``sqlalchemy.create_engine`` (see + https://docs.sqlalchemy.org/en/latest/core/engines.html) + :param kwargs: Extra kwargs that will be passed to + ``sqlalchemy.create_engine`` + (see https:///docs.sqlalchemy.org/en/latest/core/engines.html) Example: @@ -347,7 +416,7 @@ class DbPlugin(Plugin): table, engine = self._get_table(table, engine=engine, *args, **kwargs) delete = table.delete() - for (k,v) in record.items(): + for (k, v) in record.items(): delete = delete.where(self._build_condition(table, k, v)) engine.execute(delete)