forked from platypush/platypush
Support for query placeholders in db.select
This commit is contained in:
parent
e77d6a4ad4
commit
1ea53a6f50
1 changed files with 106 additions and 37 deletions
|
@ -3,9 +3,11 @@
|
|||
"""
|
||||
|
||||
import time
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy import create_engine, Table, MetaData
|
||||
from sqlalchemy.engine import Engine
|
||||
from sqlalchemy.sql import text
|
||||
|
||||
from platypush.plugins import Plugin, action
|
||||
|
||||
|
@ -23,10 +25,17 @@ class DbPlugin(Plugin):
|
|||
|
||||
def __init__(self, engine=None, *args, **kwargs):
|
||||
"""
|
||||
:param engine: Default SQLAlchemy connection engine string (e.g. ``sqlite:///:memory:`` or ``mysql://user:pass@localhost/test``) that will be used. You can override the default engine in the db actions.
|
||||
:param engine: Default SQLAlchemy connection engine string (e.g.
|
||||
``sqlite:///:memory:`` or ``mysql://user:pass@localhost/test``)
|
||||
that will be used. You can override the default engine in the db
|
||||
actions.
|
||||
:type engine: str
|
||||
:param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param args: Extra arguments that will be passed to
|
||||
``sqlalchemy.create_engine`` (see
|
||||
https://docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param kwargs: Extra kwargs that will be passed to
|
||||
``sqlalchemy.create_engine``
|
||||
(see https:///docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
"""
|
||||
|
||||
super().__init__()
|
||||
|
@ -41,11 +50,11 @@ class DbPlugin(Plugin):
|
|||
|
||||
return create_engine(engine, *args, **kwargs)
|
||||
|
||||
assert self.engine
|
||||
return self.engine
|
||||
|
||||
# noinspection PyUnusedLocal
|
||||
@staticmethod
|
||||
def _build_condition(table, column, value):
|
||||
def _build_condition(table, column, value): # type: ignore
|
||||
if isinstance(value, str):
|
||||
value = "'{}'".format(value)
|
||||
elif not isinstance(value, int) and not isinstance(value, float):
|
||||
|
@ -69,8 +78,12 @@ class DbPlugin(Plugin):
|
|||
:type statement: str
|
||||
:param engine: Engine to be used (default: default class engine)
|
||||
:type engine: str
|
||||
:param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param args: Extra arguments that will be passed to
|
||||
``sqlalchemy.create_engine`` (see
|
||||
https://docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param kwargs: Extra kwargs that will be passed to
|
||||
``sqlalchemy.create_engine``
|
||||
(see https:///docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
"""
|
||||
|
||||
engine = self._get_engine(engine, *args, **kwargs)
|
||||
|
@ -106,24 +119,42 @@ class DbPlugin(Plugin):
|
|||
return table, engine
|
||||
|
||||
@action
|
||||
def select(self, query=None, table=None, filter=None, engine=None, *args, **kwargs):
|
||||
def select(
|
||||
self,
|
||||
query=None,
|
||||
table=None,
|
||||
filter=None,
|
||||
engine=None,
|
||||
data: Optional[dict] = None,
|
||||
*args,
|
||||
**kwargs
|
||||
):
|
||||
"""
|
||||
Returns rows (as a list of hashes) given a query.
|
||||
|
||||
:param query: SQL to be executed
|
||||
:type query: str
|
||||
:param filter: Query WHERE filter expressed as a dictionary. This approach is preferred over specifying raw SQL
|
||||
in ``query`` as the latter approach may be prone to SQL injection, unless you need to build some complex
|
||||
SQL logic.
|
||||
:param filter: Query WHERE filter expressed as a dictionary. This
|
||||
approach is preferred over specifying raw SQL
|
||||
in ``query`` as the latter approach may be prone to SQL injection,
|
||||
unless you need to build some complex SQL logic.
|
||||
:type filter: dict
|
||||
:param table: If you specified a filter instead of a raw query, you'll have to specify the target table
|
||||
:param table: If you specified a filter instead of a raw query, you'll
|
||||
have to specify the target table
|
||||
:type table: str
|
||||
:param engine: Engine to be used (default: default class engine)
|
||||
:type engine: str
|
||||
:param args: Extra arguments that will be passed to ``sqlalchemy.create_engine``
|
||||
(see https://docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine``
|
||||
(seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param data: If ``query`` is an SQL string, then you can use
|
||||
SQLAlchemy's *placeholders* mechanism. You can specify placeholders
|
||||
in the query for values that you want to be safely serialized, and
|
||||
their values can be specified on the ``data`` attribute in a
|
||||
``name`` ➡️ ``value`` mapping format.
|
||||
:param args: Extra arguments that will be passed to
|
||||
``sqlalchemy.create_engine`` (see
|
||||
https://docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param kwargs: Extra kwargs that will be passed to
|
||||
``sqlalchemy.create_engine`` (see
|
||||
https:///docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:returns: List of hashes representing the result rows.
|
||||
|
||||
Examples:
|
||||
|
@ -136,7 +167,10 @@ class DbPlugin(Plugin):
|
|||
"action": "db.select",
|
||||
"args": {
|
||||
"engine": "sqlite:///:memory:",
|
||||
"query": "SELECT id, name FROM table"
|
||||
"query": "SELECT id, name FROM table WHERE name = :name",
|
||||
"data": {
|
||||
"name": "foobar"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -165,19 +199,24 @@ class DbPlugin(Plugin):
|
|||
|
||||
engine = self._get_engine(engine, *args, **kwargs)
|
||||
|
||||
if isinstance(query, str):
|
||||
query = text(query)
|
||||
|
||||
if table:
|
||||
table, engine = self._get_table(table, engine=engine, *args, **kwargs)
|
||||
query = table.select()
|
||||
|
||||
if filter:
|
||||
for (k,v) in filter.items():
|
||||
for (k, v) in filter.items():
|
||||
query = query.where(self._build_condition(table, k, v))
|
||||
|
||||
if query is None:
|
||||
raise RuntimeError('You need to specify either "query", or "table" and "filter"')
|
||||
raise RuntimeError(
|
||||
'You need to specify either "query", or "table" and "filter"'
|
||||
)
|
||||
|
||||
with engine.connect() as connection:
|
||||
result = connection.execute(query)
|
||||
result = connection.execute(query, **(data or {}))
|
||||
columns = result.keys()
|
||||
rows = [
|
||||
{col: row[i] for i, col in enumerate(list(columns))}
|
||||
|
@ -187,8 +226,16 @@ class DbPlugin(Plugin):
|
|||
return rows
|
||||
|
||||
@action
|
||||
def insert(self, table, records, engine=None, key_columns=None,
|
||||
on_duplicate_update=False, *args, **kwargs):
|
||||
def insert(
|
||||
self,
|
||||
table,
|
||||
records,
|
||||
engine=None,
|
||||
key_columns=None,
|
||||
on_duplicate_update=False,
|
||||
*args,
|
||||
**kwargs
|
||||
):
|
||||
"""
|
||||
Inserts records (as a list of hashes) into a table.
|
||||
|
||||
|
@ -198,12 +245,20 @@ class DbPlugin(Plugin):
|
|||
:type records: list
|
||||
:param engine: Engine to be used (default: default class engine)
|
||||
:type engine: str
|
||||
:param key_columns: Set it to specify the names of the key columns for ``table``. Set it if you want your statement to be executed with the ``on_duplicate_update`` flag.
|
||||
:param key_columns: Set it to specify the names of the key columns for
|
||||
``table``. Set it if you want your statement to be executed with
|
||||
the ``on_duplicate_update`` flag.
|
||||
:type key_columns: list
|
||||
:param on_duplicate_update: If set, update the records in case of duplicate rows (default: False). If set, you'll need to specify ``key_columns`` as well.
|
||||
:param on_duplicate_update: If set, update the records in case of
|
||||
duplicate rows (default: False). If set, you'll need to specify
|
||||
``key_columns`` as well.
|
||||
:type on_duplicate_update: bool
|
||||
:param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param args: Extra arguments that will be passed to
|
||||
``sqlalchemy.create_engine`` (see
|
||||
https://docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param kwargs: Extra kwargs that will be passed to
|
||||
``sqlalchemy.create_engine``
|
||||
(see https:///docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
|
||||
Example:
|
||||
|
||||
|
@ -238,15 +293,21 @@ class DbPlugin(Plugin):
|
|||
|
||||
for record in records:
|
||||
table, engine = self._get_table(table, engine=engine, *args, **kwargs)
|
||||
|
||||
insert = table.insert().values(**record)
|
||||
|
||||
try:
|
||||
engine.execute(insert)
|
||||
except Exception as e:
|
||||
if on_duplicate_update and key_columns:
|
||||
self.update(table=table, records=records,
|
||||
key_columns=key_columns, engine=engine,
|
||||
*args, **kwargs)
|
||||
self.update(
|
||||
table=table,
|
||||
records=records,
|
||||
key_columns=key_columns,
|
||||
engine=engine,
|
||||
*args,
|
||||
**kwargs
|
||||
)
|
||||
else:
|
||||
raise e
|
||||
|
||||
|
@ -263,8 +324,12 @@ class DbPlugin(Plugin):
|
|||
:type key_columns: list
|
||||
:param engine: Engine to be used (default: default class engine)
|
||||
:type engine: str
|
||||
:param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param args: Extra arguments that will be passed to
|
||||
``sqlalchemy.create_engine`` (see
|
||||
https://docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param kwargs: Extra kwargs that will be passed to
|
||||
``sqlalchemy.create_engine``
|
||||
(see https:///docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
|
||||
Example:
|
||||
|
||||
|
@ -297,12 +362,12 @@ class DbPlugin(Plugin):
|
|||
|
||||
for record in records:
|
||||
table, engine = self._get_table(table, engine=engine, *args, **kwargs)
|
||||
key = { k:v for (k,v) in record.items() if k in key_columns }
|
||||
values = { k:v for (k,v) in record.items() if k not in key_columns }
|
||||
key = {k: v for (k, v) in record.items() if k in key_columns}
|
||||
values = {k: v for (k, v) in record.items() if k not in key_columns}
|
||||
|
||||
update = table.update()
|
||||
|
||||
for (k,v) in key.items():
|
||||
for (k, v) in key.items():
|
||||
update = update.where(self._build_condition(table, k, v))
|
||||
|
||||
update = update.values(**values)
|
||||
|
@ -319,8 +384,12 @@ class DbPlugin(Plugin):
|
|||
:type records: list
|
||||
:param engine: Engine to be used (default: default class engine)
|
||||
:type engine: str
|
||||
:param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see https://docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (seehttps:///docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param args: Extra arguments that will be passed to
|
||||
``sqlalchemy.create_engine`` (see
|
||||
https://docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
:param kwargs: Extra kwargs that will be passed to
|
||||
``sqlalchemy.create_engine``
|
||||
(see https:///docs.sqlalchemy.org/en/latest/core/engines.html)
|
||||
|
||||
Example:
|
||||
|
||||
|
@ -347,7 +416,7 @@ class DbPlugin(Plugin):
|
|||
table, engine = self._get_table(table, engine=engine, *args, **kwargs)
|
||||
delete = table.delete()
|
||||
|
||||
for (k,v) in record.items():
|
||||
for (k, v) in record.items():
|
||||
delete = delete.where(self._build_condition(table, k, v))
|
||||
|
||||
engine.execute(delete)
|
||||
|
|
Loading…
Add table
Reference in a new issue