diff --git a/platypush/plugins/db/__init__.py b/platypush/plugins/db/__init__.py index 078bd48bd..59776bd60 100644 --- a/platypush/plugins/db/__init__.py +++ b/platypush/plugins/db/__init__.py @@ -14,9 +14,6 @@ class DbPlugin(Plugin): Requires: * **sqlalchemy** (``pip install sqlalchemy``) - - .. todo:: - Implement ``update`` and ``delete`` methods """ engine = None @@ -122,7 +119,8 @@ class DbPlugin(Plugin): @action - def insert(self, table, records, engine=None, *args, **kwargs): + def insert(self, table, records, engine=None, key_columns=[], + on_duplicate_update=False, *args, **kwargs): """ Inserts records (as a list of hashes) into a table. @@ -132,6 +130,10 @@ class DbPlugin(Plugin): :type records: list :param engine: Engine to be used (default: default class engine) :type engine: str + :param key_columns: Set it to specify the names of the key columns for ``table``. Set it if you want your statement to be executed with the ``on_duplicate_update`` flag. + :type key_columns: list + :param on_duplicate_update: If set, update the records in case of duplicate rows (default: False). If set, you'll need to specify ``key_columns`` as well. + :type on_duplicate_update: bool :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see http://docs.sqlalchemy.org/en/latest/core/engines.html) :param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (see http://docs.sqlalchemy.org/en/latest/core/engines.html) @@ -161,13 +163,128 @@ class DbPlugin(Plugin): } """ + db = self._get_engine(engine, *args, **kwargs) + + for record in records: + metadata = MetaData() + table = Table(table, metadata, autoload=True, autoload_with=db) + insert = table.insert().values(**record) + + try: + db.execute(insert) + except Exception as e: + if on_duplicate_update and key_columns: + self.update(table=table, records=records, + key_columns=key_columns, engine=engine, + *args, **kwargs) + else: + raise e + + + @action + def update(self, table, records, key_columns, engine=None, *args, **kwargs): + """ + Updates records on a table. + + :param table: Table name + :type table: str + :param records: Records to be updated (as a list of hashes) + :type records: list + :param key_columns: Names of the key columns, used in the WHERE condition + :type key_columns: list + :param engine: Engine to be used (default: default class engine) + :type engine: str + :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see http://docs.sqlalchemy.org/en/latest/core/engines.html) + :param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (see http://docs.sqlalchemy.org/en/latest/core/engines.html) + + Example: + + Request:: + + { + "type": "request", + "target": "your_host", + "action": "db.update", + "args": { + "table": "table", + "engine": "sqlite:///:memory:", + "key_columns": ["id"], + "records": [ + { + "id": 1, + "name": foo + }, + + { + "id": 2, + "name": bar + } + ] + } + } + """ + engine = self._get_engine(engine, *args, **kwargs) for record in records: metadata = MetaData() table = Table(table, metadata, autoload=True, autoload_with=engine) - insert = table.insert().values(**record) - engine.execute(insert) + key = { k:v for (k,v) in record.items() if k in key_columns } + values = { k:v for (k,v) in record.items() if k not in key_columns } + + update = table.update() + + for (k,v) in key.items(): + update = update.where(eval('table.c.{}=={}'.format(k, v))) + + update = update.values(**values) + engine.execute(update) + + + @action + def delete(self, table, records, engine=None, *args, **kwargs): + """ + Deletes records from a table. + + :param table: Table name + :type table: str + :param records: Records to be deleted, as a list of dictionaries + :type records: list + :param engine: Engine to be used (default: default class engine) + :type engine: str + :param args: Extra arguments that will be passed to ``sqlalchemy.create_engine`` (see http://docs.sqlalchemy.org/en/latest/core/engines.html) + :param kwargs: Extra kwargs that will be passed to ``sqlalchemy.create_engine`` (see http://docs.sqlalchemy.org/en/latest/core/engines.html) + + Example: + + Request:: + + { + "type": "request", + "target": "your_host", + "action": "db.delete", + "args": { + "table": "table", + "engine": "sqlite:///:memory:", + "records": [ + { "id": 1 }, + { "id": 2 } + ] + } + } + """ + + engine = self._get_engine(engine, *args, **kwargs) + + for record in records: + metadata = MetaData() + table = Table(table, metadata, autoload=True, autoload_with=engine) + delete = table.delete() + + for (k,v) in record.items(): + delete = delete.where(eval('table.c.{}=={}'.format(k, v))) + + engine.execute(delete) # vim:sw=4:ts=4:et: