From f40f956507052f80921aeb5ac14712a108c00cda Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sat, 29 Apr 2023 11:36:55 +0200 Subject: [PATCH] Migrated `variable` table to the new entities framework. --- platypush/entities/variables.py | 29 ++++ .../c39ac404119b_migrate_variable_table.py | 153 ++++++++++++++++++ platypush/plugins/variable/__init__.py | 134 +++++++-------- 3 files changed, 238 insertions(+), 78 deletions(-) create mode 100644 platypush/entities/variables.py create mode 100644 platypush/migrations/alembic/versions/c39ac404119b_migrate_variable_table.py diff --git a/platypush/entities/variables.py b/platypush/entities/variables.py new file mode 100644 index 0000000000..39c7a8ff76 --- /dev/null +++ b/platypush/entities/variables.py @@ -0,0 +1,29 @@ +import logging + +from sqlalchemy import Column, ForeignKey, Integer, String + +from platypush.common.db import Base + +from . import Entity + +logger = logging.getLogger(__name__) + + +if 'variable' not in Base.metadata: + + class Variable(Entity): + """ + Models a variable entity. + """ + + __tablename__ = 'variable' + + id = Column( + Integer, ForeignKey('entity.id', ondelete='CASCADE'), primary_key=True + ) + value = Column(String) + + __table_args__ = {'keep_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } diff --git a/platypush/migrations/alembic/versions/c39ac404119b_migrate_variable_table.py b/platypush/migrations/alembic/versions/c39ac404119b_migrate_variable_table.py new file mode 100644 index 0000000000..8efa908543 --- /dev/null +++ b/platypush/migrations/alembic/versions/c39ac404119b_migrate_variable_table.py @@ -0,0 +1,153 @@ +"""Migrate variable table + +Revision ID: c39ac404119b +Revises: d030953a871d +Create Date: 2023-04-28 22:35:28.307954 + +""" + +import sqlalchemy as sa +from alembic import op + +from platypush.entities import Entity + +# revision identifiers, used by Alembic. +revision = 'c39ac404119b' +down_revision = 'd030953a871d' +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Get the connection and the existing `variable` table + conn = op.get_bind() + metadata = sa.MetaData() + metadata.reflect(bind=conn) + VariableOld = metadata.tables.get('variable') + + if VariableOld is None: + print('The table `variable` does not exist, skipping migration') + return + + # Create the `variable_new` table + VariableNew = op.create_table( + 'variable_new', + sa.Column( + 'id', + sa.Integer, + sa.ForeignKey(Entity.id, ondelete='CASCADE'), + primary_key=True, + ), + sa.Column('value', sa.String), + ) + + assert VariableNew is not None, 'Could not create table "variable_new"' + + # Select all existing variables + existing_vars = { + var.name: var.value for var in conn.execute(sa.select(VariableOld)).all() + } + + # Insert all the existing variables as entities + if existing_vars: + conn.execute( + sa.insert(Entity).values( + [ + { + 'external_id': name, + 'name': name, + 'type': 'variable', + 'plugin': 'variable', + } + for name in existing_vars + ] + ) + ) + + # Fetch all the newly inserted variables + new_vars = { + entity.id: entity.name + for entity in conn.execute( + sa.select(Entity.id, Entity.name).where( + sa.or_( + *[ + sa.and_( + Entity.external_id == name, + Entity.type == 'variable', + Entity.plugin == 'variable', + ) + for name in existing_vars + ] + ) + ) + ).all() + } + + # Insert the mapping on the `variable_new` table + op.bulk_insert( + VariableNew, + [ + { + 'id': id, + 'value': existing_vars.get(name), + } + for id, name in new_vars.items() + ], + ) + + # Rename/drop the tables + op.rename_table('variable', 'variable_old') + op.rename_table('variable_new', 'variable') + op.drop_table('variable_old') + + +def downgrade() -> None: + # Get the connection and the existing `variable` table + conn = op.get_bind() + metadata = sa.MetaData() + metadata.reflect(bind=conn) + VariableNew = metadata.tables['variable'] + + if VariableNew is None: + print('The table `variable` does not exist, skipping migration') + return + + # Create the `variable_old` table + VariableOld = op.create_table( + 'variable_old', + sa.Column('name', sa.String, primary_key=True, nullable=False), + sa.Column('value', sa.String), + ) + + assert VariableOld is not None, 'Could not create table "variable_old"' + + # Select all existing variables + existing_vars = { + var.name: var.value + for var in conn.execute( + sa.select(Entity.name, VariableNew.c.value).join( + Entity, Entity.id == VariableNew.c.id + ) + ).all() + } + + # Insert the mapping on the `variable_old` table + if existing_vars: + op.bulk_insert( + VariableOld, + [ + { + 'name': name, + 'value': value, + } + for name, value in existing_vars.items() + ], + ) + + # Delete existing references on the `entity` table + conn.execute(sa.delete(Entity).where(Entity.type == 'variable')) + + # Rename/drop the tables + op.rename_table('variable', 'variable_new') + op.rename_table('variable_old', 'variable') + op.drop_table('variable_new') diff --git a/platypush/plugins/variable/__init__.py b/platypush/plugins/variable/__init__.py index 9556b07f9d..020c4ee431 100644 --- a/platypush/plugins/variable/__init__.py +++ b/platypush/plugins/variable/__init__.py @@ -1,24 +1,13 @@ -from sqlalchemy import Column, String +from typing import Collection, Dict, Optional +from typing_extensions import override -from platypush.common.db import declarative_base -from platypush.context import get_plugin +# from platypush.common.db import Base +from platypush.entities import EntityManager +from platypush.entities.variables import Variable from platypush.plugins import Plugin, action -from platypush.plugins.db import DbPlugin - -Base = declarative_base() -# pylint: disable=too-few-public-methods -class Variable(Base): - """Models the variable table""" - - __tablename__ = 'variable' - - name = Column(String, primary_key=True, nullable=False) - value = Column(String) - - -class VariablePlugin(Plugin): +class VariablePlugin(Plugin, EntityManager): """ This plugin allows you to manipulate context variables that can be accessed across your tasks. It requires the :mod:`platypush.plugins.db` @@ -27,39 +16,35 @@ class VariablePlugin(Plugin): """ def __init__(self, **kwargs): - """ - The plugin will create a table named ``variable`` on the database - configured in the :mod:`platypush.plugins.db` plugin. You'll have - to specify a default ``engine`` in your ``db`` plugin configuration. - """ - super().__init__(**kwargs) - db_plugin = get_plugin('db') - redis_plugin = get_plugin('redis') - assert db_plugin, 'Database plugin not configured' - assert redis_plugin, 'Redis plugin not configured' - self.redis_plugin = redis_plugin - self.db_plugin: DbPlugin = db_plugin - self.db_plugin.create_all(self.db_plugin.get_engine(), Base) + db = self._db + self._db_vars: Dict[str, Optional[str]] = {} + """ Local cache for db variables. """ + + # db.create_all(db.get_engine(), Base) + with db.get_session() as session: + self._db_vars.update( + { # type: ignore + str(var.name): var.value for var in session.query(Variable).all() + } + ) @action - def get(self, name, default_value=None): + def get(self, name: Optional[str] = None, default_value=None): """ Get the value of a variable by name from the local db. - :param name: Variable name - :type name: str - + :param name: Variable name. If not specified, all the stored variables will be returned. :param default_value: What will be returned if the variable is not defined (default: None) - :returns: A map in the format ``{"":""}`` """ - with self.db_plugin.get_session() as session: - var = session.query(Variable).filter_by(name=name).first() - - return {name: (var.value if var is not None else default_value)} + return ( + {name: self._db_vars.get(name, default_value)} + if name is not None + else self.status().output + ) @action def set(self, **kwargs): @@ -69,53 +54,32 @@ class VariablePlugin(Plugin): :param kwargs: Key-value list of variables to set (e.g. ``foo='bar', answer=42``) """ - with self.db_plugin.get_session() as session: - existing_vars = { - var.name: var - for var in session.query(Variable) - .filter(Variable.name.in_(kwargs.keys())) - .all() - } - - new_vars = { - name: Variable(name=name, value=value) - for name, value in kwargs.items() - if name not in existing_vars - } - - for name, var in existing_vars.items(): - var.value = kwargs[name] # type: ignore - - session.add_all([*existing_vars.values(), *new_vars.values()]) - + self.publish_entities(kwargs) + self._db_vars.update(kwargs) return kwargs @action - def unset(self, name): + def unset(self, name: str): """ Unset a variable by name if it's set on the local db. - :param name: Name of the variable to remove - :type name: str + :param name: Name of the variable to remove. """ - with self.db_plugin.get_session() as session: - session.query(Variable).filter_by(name=name).delete() - + self.publish_entities({name: None}) + self._db_vars.pop(name, None) return True @action - def mget(self, name): + def mget(self, name: str): """ Get the value of a variable by name from Redis. :param name: Variable name - :type name: str - :returns: A map in the format ``{"":""}`` """ - return self.redis_plugin.mget([name]) + return self._redis.mget([name]) @action def mset(self, **kwargs): @@ -123,37 +87,51 @@ class VariablePlugin(Plugin): Set a variable or a set of variables on Redis. :param kwargs: Key-value list of variables to set (e.g. ``foo='bar', answer=42``) - :returns: A map with the set variables """ - self.redis_plugin.mset(**kwargs) + self._redis.mset(**kwargs) return kwargs @action - def munset(self, name): + def munset(self, name: str): """ Unset a Redis variable by name if it's set :param name: Name of the variable to remove - :type name: str """ - return self.redis_plugin.delete(name) + return self._redis.delete(name) @action - def expire(self, name, expire): + def expire(self, name: str, expire: int): """ Set a variable expiration on Redis :param name: Variable name - :type name: str - :param expire: Expiration time in seconds - :type expire: int """ - return self.redis_plugin.expire(name, expire) + return self._redis.expire(name, expire) + + @override + def transform_entities(self, entities: dict) -> Collection[Variable]: + return super().transform_entities( + [ + Variable(id=name, name=name, value=value) + for name, value in entities.items() + ] + ) + + @override + @action + def status(self): + variables = { + name: value for name, value in self._db_vars.items() if value is not None + } + + super().publish_entities(variables) + return variables # vim:sw=4:ts=4:et: