forked from platypush/platypush
Migrated variable
table to the new entities framework.
This commit is contained in:
parent
8fe61217ce
commit
f40f956507
3 changed files with 238 additions and 78 deletions
29
platypush/entities/variables.py
Normal file
29
platypush/entities/variables.py
Normal file
|
@ -0,0 +1,29 @@
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from sqlalchemy import Column, ForeignKey, Integer, String
|
||||||
|
|
||||||
|
from platypush.common.db import Base
|
||||||
|
|
||||||
|
from . import Entity
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
if 'variable' not in Base.metadata:
|
||||||
|
|
||||||
|
class Variable(Entity):
|
||||||
|
"""
|
||||||
|
Models a variable entity.
|
||||||
|
"""
|
||||||
|
|
||||||
|
__tablename__ = 'variable'
|
||||||
|
|
||||||
|
id = Column(
|
||||||
|
Integer, ForeignKey('entity.id', ondelete='CASCADE'), primary_key=True
|
||||||
|
)
|
||||||
|
value = Column(String)
|
||||||
|
|
||||||
|
__table_args__ = {'keep_existing': True}
|
||||||
|
__mapper_args__ = {
|
||||||
|
'polymorphic_identity': __tablename__,
|
||||||
|
}
|
|
@ -0,0 +1,153 @@
|
||||||
|
"""Migrate variable table
|
||||||
|
|
||||||
|
Revision ID: c39ac404119b
|
||||||
|
Revises: d030953a871d
|
||||||
|
Create Date: 2023-04-28 22:35:28.307954
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
from platypush.entities import Entity
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = 'c39ac404119b'
|
||||||
|
down_revision = 'd030953a871d'
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
# Get the connection and the existing `variable` table
|
||||||
|
conn = op.get_bind()
|
||||||
|
metadata = sa.MetaData()
|
||||||
|
metadata.reflect(bind=conn)
|
||||||
|
VariableOld = metadata.tables.get('variable')
|
||||||
|
|
||||||
|
if VariableOld is None:
|
||||||
|
print('The table `variable` does not exist, skipping migration')
|
||||||
|
return
|
||||||
|
|
||||||
|
# Create the `variable_new` table
|
||||||
|
VariableNew = op.create_table(
|
||||||
|
'variable_new',
|
||||||
|
sa.Column(
|
||||||
|
'id',
|
||||||
|
sa.Integer,
|
||||||
|
sa.ForeignKey(Entity.id, ondelete='CASCADE'),
|
||||||
|
primary_key=True,
|
||||||
|
),
|
||||||
|
sa.Column('value', sa.String),
|
||||||
|
)
|
||||||
|
|
||||||
|
assert VariableNew is not None, 'Could not create table "variable_new"'
|
||||||
|
|
||||||
|
# Select all existing variables
|
||||||
|
existing_vars = {
|
||||||
|
var.name: var.value for var in conn.execute(sa.select(VariableOld)).all()
|
||||||
|
}
|
||||||
|
|
||||||
|
# Insert all the existing variables as entities
|
||||||
|
if existing_vars:
|
||||||
|
conn.execute(
|
||||||
|
sa.insert(Entity).values(
|
||||||
|
[
|
||||||
|
{
|
||||||
|
'external_id': name,
|
||||||
|
'name': name,
|
||||||
|
'type': 'variable',
|
||||||
|
'plugin': 'variable',
|
||||||
|
}
|
||||||
|
for name in existing_vars
|
||||||
|
]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Fetch all the newly inserted variables
|
||||||
|
new_vars = {
|
||||||
|
entity.id: entity.name
|
||||||
|
for entity in conn.execute(
|
||||||
|
sa.select(Entity.id, Entity.name).where(
|
||||||
|
sa.or_(
|
||||||
|
*[
|
||||||
|
sa.and_(
|
||||||
|
Entity.external_id == name,
|
||||||
|
Entity.type == 'variable',
|
||||||
|
Entity.plugin == 'variable',
|
||||||
|
)
|
||||||
|
for name in existing_vars
|
||||||
|
]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
).all()
|
||||||
|
}
|
||||||
|
|
||||||
|
# Insert the mapping on the `variable_new` table
|
||||||
|
op.bulk_insert(
|
||||||
|
VariableNew,
|
||||||
|
[
|
||||||
|
{
|
||||||
|
'id': id,
|
||||||
|
'value': existing_vars.get(name),
|
||||||
|
}
|
||||||
|
for id, name in new_vars.items()
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Rename/drop the tables
|
||||||
|
op.rename_table('variable', 'variable_old')
|
||||||
|
op.rename_table('variable_new', 'variable')
|
||||||
|
op.drop_table('variable_old')
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
# Get the connection and the existing `variable` table
|
||||||
|
conn = op.get_bind()
|
||||||
|
metadata = sa.MetaData()
|
||||||
|
metadata.reflect(bind=conn)
|
||||||
|
VariableNew = metadata.tables['variable']
|
||||||
|
|
||||||
|
if VariableNew is None:
|
||||||
|
print('The table `variable` does not exist, skipping migration')
|
||||||
|
return
|
||||||
|
|
||||||
|
# Create the `variable_old` table
|
||||||
|
VariableOld = op.create_table(
|
||||||
|
'variable_old',
|
||||||
|
sa.Column('name', sa.String, primary_key=True, nullable=False),
|
||||||
|
sa.Column('value', sa.String),
|
||||||
|
)
|
||||||
|
|
||||||
|
assert VariableOld is not None, 'Could not create table "variable_old"'
|
||||||
|
|
||||||
|
# Select all existing variables
|
||||||
|
existing_vars = {
|
||||||
|
var.name: var.value
|
||||||
|
for var in conn.execute(
|
||||||
|
sa.select(Entity.name, VariableNew.c.value).join(
|
||||||
|
Entity, Entity.id == VariableNew.c.id
|
||||||
|
)
|
||||||
|
).all()
|
||||||
|
}
|
||||||
|
|
||||||
|
# Insert the mapping on the `variable_old` table
|
||||||
|
if existing_vars:
|
||||||
|
op.bulk_insert(
|
||||||
|
VariableOld,
|
||||||
|
[
|
||||||
|
{
|
||||||
|
'name': name,
|
||||||
|
'value': value,
|
||||||
|
}
|
||||||
|
for name, value in existing_vars.items()
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
# Delete existing references on the `entity` table
|
||||||
|
conn.execute(sa.delete(Entity).where(Entity.type == 'variable'))
|
||||||
|
|
||||||
|
# Rename/drop the tables
|
||||||
|
op.rename_table('variable', 'variable_new')
|
||||||
|
op.rename_table('variable_old', 'variable')
|
||||||
|
op.drop_table('variable_new')
|
|
@ -1,24 +1,13 @@
|
||||||
from sqlalchemy import Column, String
|
from typing import Collection, Dict, Optional
|
||||||
|
from typing_extensions import override
|
||||||
|
|
||||||
from platypush.common.db import declarative_base
|
# from platypush.common.db import Base
|
||||||
from platypush.context import get_plugin
|
from platypush.entities import EntityManager
|
||||||
|
from platypush.entities.variables import Variable
|
||||||
from platypush.plugins import Plugin, action
|
from platypush.plugins import Plugin, action
|
||||||
from platypush.plugins.db import DbPlugin
|
|
||||||
|
|
||||||
Base = declarative_base()
|
|
||||||
|
|
||||||
|
|
||||||
# pylint: disable=too-few-public-methods
|
class VariablePlugin(Plugin, EntityManager):
|
||||||
class Variable(Base):
|
|
||||||
"""Models the variable table"""
|
|
||||||
|
|
||||||
__tablename__ = 'variable'
|
|
||||||
|
|
||||||
name = Column(String, primary_key=True, nullable=False)
|
|
||||||
value = Column(String)
|
|
||||||
|
|
||||||
|
|
||||||
class VariablePlugin(Plugin):
|
|
||||||
"""
|
"""
|
||||||
This plugin allows you to manipulate context variables that can be
|
This plugin allows you to manipulate context variables that can be
|
||||||
accessed across your tasks. It requires the :mod:`platypush.plugins.db`
|
accessed across your tasks. It requires the :mod:`platypush.plugins.db`
|
||||||
|
@ -27,39 +16,35 @@ class VariablePlugin(Plugin):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
"""
|
|
||||||
The plugin will create a table named ``variable`` on the database
|
|
||||||
configured in the :mod:`platypush.plugins.db` plugin. You'll have
|
|
||||||
to specify a default ``engine`` in your ``db`` plugin configuration.
|
|
||||||
"""
|
|
||||||
|
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
db_plugin = get_plugin('db')
|
|
||||||
redis_plugin = get_plugin('redis')
|
|
||||||
assert db_plugin, 'Database plugin not configured'
|
|
||||||
assert redis_plugin, 'Redis plugin not configured'
|
|
||||||
|
|
||||||
self.redis_plugin = redis_plugin
|
db = self._db
|
||||||
self.db_plugin: DbPlugin = db_plugin
|
self._db_vars: Dict[str, Optional[str]] = {}
|
||||||
self.db_plugin.create_all(self.db_plugin.get_engine(), Base)
|
""" Local cache for db variables. """
|
||||||
|
|
||||||
|
# db.create_all(db.get_engine(), Base)
|
||||||
|
with db.get_session() as session:
|
||||||
|
self._db_vars.update(
|
||||||
|
{ # type: ignore
|
||||||
|
str(var.name): var.value for var in session.query(Variable).all()
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def get(self, name, default_value=None):
|
def get(self, name: Optional[str] = None, default_value=None):
|
||||||
"""
|
"""
|
||||||
Get the value of a variable by name from the local db.
|
Get the value of a variable by name from the local db.
|
||||||
|
|
||||||
:param name: Variable name
|
:param name: Variable name. If not specified, all the stored variables will be returned.
|
||||||
:type name: str
|
|
||||||
|
|
||||||
:param default_value: What will be returned if the variable is not defined (default: None)
|
:param default_value: What will be returned if the variable is not defined (default: None)
|
||||||
|
|
||||||
:returns: A map in the format ``{"<name>":"<value>"}``
|
:returns: A map in the format ``{"<name>":"<value>"}``
|
||||||
"""
|
"""
|
||||||
|
|
||||||
with self.db_plugin.get_session() as session:
|
return (
|
||||||
var = session.query(Variable).filter_by(name=name).first()
|
{name: self._db_vars.get(name, default_value)}
|
||||||
|
if name is not None
|
||||||
return {name: (var.value if var is not None else default_value)}
|
else self.status().output
|
||||||
|
)
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def set(self, **kwargs):
|
def set(self, **kwargs):
|
||||||
|
@ -69,53 +54,32 @@ class VariablePlugin(Plugin):
|
||||||
:param kwargs: Key-value list of variables to set (e.g. ``foo='bar', answer=42``)
|
:param kwargs: Key-value list of variables to set (e.g. ``foo='bar', answer=42``)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
with self.db_plugin.get_session() as session:
|
self.publish_entities(kwargs)
|
||||||
existing_vars = {
|
self._db_vars.update(kwargs)
|
||||||
var.name: var
|
|
||||||
for var in session.query(Variable)
|
|
||||||
.filter(Variable.name.in_(kwargs.keys()))
|
|
||||||
.all()
|
|
||||||
}
|
|
||||||
|
|
||||||
new_vars = {
|
|
||||||
name: Variable(name=name, value=value)
|
|
||||||
for name, value in kwargs.items()
|
|
||||||
if name not in existing_vars
|
|
||||||
}
|
|
||||||
|
|
||||||
for name, var in existing_vars.items():
|
|
||||||
var.value = kwargs[name] # type: ignore
|
|
||||||
|
|
||||||
session.add_all([*existing_vars.values(), *new_vars.values()])
|
|
||||||
|
|
||||||
return kwargs
|
return kwargs
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def unset(self, name):
|
def unset(self, name: str):
|
||||||
"""
|
"""
|
||||||
Unset a variable by name if it's set on the local db.
|
Unset a variable by name if it's set on the local db.
|
||||||
|
|
||||||
:param name: Name of the variable to remove
|
:param name: Name of the variable to remove.
|
||||||
:type name: str
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
with self.db_plugin.get_session() as session:
|
self.publish_entities({name: None})
|
||||||
session.query(Variable).filter_by(name=name).delete()
|
self._db_vars.pop(name, None)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def mget(self, name):
|
def mget(self, name: str):
|
||||||
"""
|
"""
|
||||||
Get the value of a variable by name from Redis.
|
Get the value of a variable by name from Redis.
|
||||||
|
|
||||||
:param name: Variable name
|
:param name: Variable name
|
||||||
:type name: str
|
|
||||||
|
|
||||||
:returns: A map in the format ``{"<name>":"<value>"}``
|
:returns: A map in the format ``{"<name>":"<value>"}``
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self.redis_plugin.mget([name])
|
return self._redis.mget([name])
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def mset(self, **kwargs):
|
def mset(self, **kwargs):
|
||||||
|
@ -123,37 +87,51 @@ class VariablePlugin(Plugin):
|
||||||
Set a variable or a set of variables on Redis.
|
Set a variable or a set of variables on Redis.
|
||||||
|
|
||||||
:param kwargs: Key-value list of variables to set (e.g. ``foo='bar', answer=42``)
|
:param kwargs: Key-value list of variables to set (e.g. ``foo='bar', answer=42``)
|
||||||
|
|
||||||
:returns: A map with the set variables
|
:returns: A map with the set variables
|
||||||
"""
|
"""
|
||||||
|
|
||||||
self.redis_plugin.mset(**kwargs)
|
self._redis.mset(**kwargs)
|
||||||
return kwargs
|
return kwargs
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def munset(self, name):
|
def munset(self, name: str):
|
||||||
"""
|
"""
|
||||||
Unset a Redis variable by name if it's set
|
Unset a Redis variable by name if it's set
|
||||||
|
|
||||||
:param name: Name of the variable to remove
|
:param name: Name of the variable to remove
|
||||||
:type name: str
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self.redis_plugin.delete(name)
|
return self._redis.delete(name)
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def expire(self, name, expire):
|
def expire(self, name: str, expire: int):
|
||||||
"""
|
"""
|
||||||
Set a variable expiration on Redis
|
Set a variable expiration on Redis
|
||||||
|
|
||||||
:param name: Variable name
|
:param name: Variable name
|
||||||
:type name: str
|
|
||||||
|
|
||||||
:param expire: Expiration time in seconds
|
:param expire: Expiration time in seconds
|
||||||
:type expire: int
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
return self.redis_plugin.expire(name, expire)
|
return self._redis.expire(name, expire)
|
||||||
|
|
||||||
|
@override
|
||||||
|
def transform_entities(self, entities: dict) -> Collection[Variable]:
|
||||||
|
return super().transform_entities(
|
||||||
|
[
|
||||||
|
Variable(id=name, name=name, value=value)
|
||||||
|
for name, value in entities.items()
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
@override
|
||||||
|
@action
|
||||||
|
def status(self):
|
||||||
|
variables = {
|
||||||
|
name: value for name, value in self._db_vars.items() if value is not None
|
||||||
|
}
|
||||||
|
|
||||||
|
super().publish_entities(variables)
|
||||||
|
return variables
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
Loading…
Reference in a new issue