[#340] Added persistence of alarm entities.

This commit is contained in:
Fabio Manganiello 2023-12-09 01:25:12 +01:00
parent 3ffaaa0eb9
commit fcb6b621ab
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
3 changed files with 244 additions and 40 deletions

View file

@ -0,0 +1,35 @@
from sqlalchemy import Boolean, Column, Float, ForeignKey, Integer, JSON, String
from platypush.common.db import is_defined
from . import Entity
if not is_defined('alarm'):
class Alarm(Entity):
"""
Alarm entity model.
"""
__tablename__ = 'alarm'
id = Column(
Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True
)
when = Column(String, nullable=False)
next_run = Column(Float, nullable=True)
enabled = Column(Boolean, nullable=False, default=True)
state = Column(String, nullable=False, default='UNKNOWN')
media = Column(String, nullable=True)
media_plugin = Column(String, nullable=True)
audio_volume = Column(Integer, nullable=True)
snooze_interval = Column(Integer, nullable=True)
actions = Column(JSON, nullable=True)
static = Column(Boolean, nullable=False, default=False)
__table_args__ = {'extend_existing': True}
__mapper_args__ = {
'polymorphic_identity': __tablename__,
}

View file

@ -1,8 +1,15 @@
from contextlib import contextmanager
import sys import sys
from typing import Optional, Dict, Any, List, Union from threading import RLock
from platypush.context import get_plugin from typing import Collection, Generator, Optional, Dict, Any, List, Union
from sqlalchemy.orm import Session
from platypush.context import get_plugin
from platypush.entities import EntityManager
from platypush.entities.alarm import Alarm as AlarmTable
from platypush.plugins import RunnablePlugin, action from platypush.plugins import RunnablePlugin, action
from platypush.plugins.db import DbPlugin
from platypush.plugins.media import MediaPlugin from platypush.plugins.media import MediaPlugin
from platypush.utils import get_plugin_name_by_class from platypush.utils import get_plugin_name_by_class
from platypush.utils.media import get_default_media_plugin from platypush.utils.media import get_default_media_plugin
@ -10,7 +17,7 @@ from platypush.utils.media import get_default_media_plugin
from ._model import Alarm, AlarmState from ._model import Alarm, AlarmState
class AlarmPlugin(RunnablePlugin): class AlarmPlugin(RunnablePlugin, EntityManager):
""" """
Alarm/timer plugin. Alarm/timer plugin.
@ -84,6 +91,7 @@ class AlarmPlugin(RunnablePlugin):
alarms that are configured to play an audio resource. alarms that are configured to play an audio resource.
""" """
super().__init__(poll_interval=poll_interval, **kwargs) super().__init__(poll_interval=poll_interval, **kwargs)
self._db_lock = RLock()
alarms = alarms or [] alarms = alarms or []
if isinstance(alarms, dict): if isinstance(alarms, dict):
alarms = [{'name': name, **alarm} for name, alarm in alarms.items()] alarms = [{'name': name, **alarm} for name, alarm in alarms.items()]
@ -106,15 +114,91 @@ class AlarmPlugin(RunnablePlugin):
'No media plugin configured. Alarms that require audio playback will not work' 'No media plugin configured. Alarms that require audio playback will not work'
) )
alarms = [ self.alarms = {
alarm.name: alarm
for alarm in [
Alarm( Alarm(
stop_event=self._should_stop, stop_event=self._should_stop,
**{'media_plugin': self.media_plugin, **alarm}, static=True,
media_plugin=alarm.pop('media_plugin', self.media_plugin),
on_change=self._on_alarm_update,
**alarm,
) )
for alarm in alarms for alarm in alarms
] ]
}
self.alarms: Dict[str, Alarm] = {alarm.name: alarm for alarm in alarms} self._synced = False
@property
def _db(self) -> DbPlugin:
db = get_plugin('db')
assert db, 'No database plugin configured'
return db
@contextmanager
def _get_session(self) -> Generator[Session, None, None]:
with self._db_lock, self._db.get_session() as session:
yield session
def _merge_alarms(self, alarms: Dict[str, AlarmTable], session: Session):
for name, alarm in alarms.items():
if name in self.alarms:
existing_alarm = self.alarms[name]
# If the alarm is static, then we only want to override its
# enabled state from the db record
if existing_alarm.static:
existing_alarm.set_enabled(bool(alarm.enabled))
else:
# If the alarm record on the db is static, but the alarm is no
# longer present in the configuration, then we want to delete it
if alarm.static:
session.delete(alarm)
else:
self.alarms[name] = Alarm.from_db(
alarm,
stop_event=self._should_stop,
media_plugin=self.media_plugin,
)
def _sync_alarms(self):
with self._get_session() as session:
db_alarms = {
str(alarm.name): alarm for alarm in session.query(AlarmTable).all()
}
self._merge_alarms(db_alarms, session)
self._clear_expired_alarms(session)
for name, alarm in self.alarms.copy().items():
if not (name in db_alarms or alarm.static):
self.alarms.pop(name, None)
if not self._synced:
self.publish_entities(self.alarms.values())
self._synced = True
def _clear_expired_alarms(self, session: Session):
expired_alarms = [
alarm
for alarm in self.alarms.values()
if alarm.is_expired() and alarm.is_shut_down()
]
if not expired_alarms:
return
expired_alarm_records = session.query(AlarmTable).filter(
AlarmTable.name.in_([alarm.name for alarm in expired_alarms])
)
for alarm in expired_alarms:
self.alarms.pop(alarm.name, None)
if alarm.static:
continue
for alarm in expired_alarm_records:
session.delete(alarm)
def _get_alarms(self) -> List[Alarm]: def _get_alarms(self) -> List[Alarm]:
return sorted( return sorted(
@ -142,6 +226,10 @@ class AlarmPlugin(RunnablePlugin):
def _disable(self, name: str): def _disable(self, name: str):
self._get_alarm(name).disable() self._get_alarm(name).disable()
def _on_alarm_update(self, alarm: Alarm):
with self._db_lock:
self.publish_entities([alarm])
def _add( def _add(
self, self,
when: Union[str, int, float], when: Union[str, int, float],
@ -161,15 +249,22 @@ class AlarmPlugin(RunnablePlugin):
media_plugin=self.media_plugin, media_plugin=self.media_plugin,
audio_volume=audio_volume, audio_volume=audio_volume,
stop_event=self._should_stop, stop_event=self._should_stop,
on_change=self._on_alarm_update,
) )
if alarm.name in self.alarms: if alarm.name in self.alarms:
assert not self.alarms[alarm.name].static, (
f'Alarm {alarm.name} is statically defined in the configuration, '
'cannot overwrite it programmatically'
)
self.logger.info('Overwriting existing alarm: %s', alarm.name) self.logger.info('Overwriting existing alarm: %s', alarm.name)
self.alarms[alarm.name].stop() self.alarms[alarm.name].stop()
self.alarms[alarm.name] = alarm self.alarms[alarm.name] = alarm
self.alarms[alarm.name].start() alarm.start()
return self.alarms[alarm.name] self.publish_entities([alarm])
return alarm
def _dismiss(self): def _dismiss(self):
alarm = self._get_current_alarm() alarm = self._get_current_alarm()
@ -197,13 +292,9 @@ class AlarmPlugin(RunnablePlugin):
audio_file: Optional[str] = None, audio_file: Optional[str] = None,
audio_volume: Optional[Union[int, float]] = None, audio_volume: Optional[Union[int, float]] = None,
enabled: bool = True, enabled: bool = True,
) -> str: ) -> dict:
""" """
Add a new alarm. NOTE: alarms that aren't statically defined in the Add a new alarm.
plugin configuration will only run in the current session. If you want
an alarm to be permanently stored, you should configure it in the alarm
backend configuration. You may want to add an alarm dynamically if it's
a one-time alarm instead.
:param when: When the alarm should be executed. It can be either a cron :param when: When the alarm should be executed. It can be either a cron
expression (for recurrent alarms), or a datetime string in ISO expression (for recurrent alarms), or a datetime string in ISO
@ -215,14 +306,14 @@ class AlarmPlugin(RunnablePlugin):
:param media: Path of the audio file to be played. :param media: Path of the audio file to be played.
:param audio_volume: Volume of the audio. :param audio_volume: Volume of the audio.
:param enabled: Whether the new alarm should be enabled (default: True). :param enabled: Whether the new alarm should be enabled (default: True).
:return: The alarm name. :return: The newly created alarm.
""" """
if audio_file: if audio_file:
self.logger.warning( self.logger.warning(
'The audio_file parameter is deprecated. Use media instead' 'The audio_file parameter is deprecated. Use media instead'
) )
alarm = self._add( return self._add(
when=when, when=when,
media=media, media=media,
audio_file=audio_file, audio_file=audio_file,
@ -230,8 +321,7 @@ class AlarmPlugin(RunnablePlugin):
name=name, name=name,
enabled=enabled, enabled=enabled,
audio_volume=audio_volume, audio_volume=audio_volume,
) ).to_dict()
return alarm.name
@action @action
def enable(self, name: str): def enable(self, name: str):
@ -278,7 +368,7 @@ class AlarmPlugin(RunnablePlugin):
return self.status() # type: ignore return self.status() # type: ignore
@action @action
def status(self) -> List[Dict[str, Any]]: def status(self, *_, **__) -> List[Dict[str, Any]]:
""" """
Get the list of configured alarms and their status. Get the list of configured alarms and their status.
@ -293,24 +383,40 @@ class AlarmPlugin(RunnablePlugin):
"when": "0 8 * * 1-5", "when": "0 8 * * 1-5",
"next_run": "2023-12-06T08:00:00.000000", "next_run": "2023-12-06T08:00:00.000000",
"enabled": true, "enabled": true,
"media": "/path/to/media.mp3",
"media_plugin": "media.vlc",
"audio_volume": 10,
"snooze_interval": 300,
"actions": [
{
"action": "tts.say",
"args": {
"text": "Good morning"
}
},
{
"action": "light.hue.on"
}
],
"state": "RUNNING" "state": "RUNNING"
} }
] ]
""" """
return [alarm.to_dict() for alarm in self._get_alarms()] ret = [alarm.to_dict() for alarm in self._get_alarms()]
self.publish_entities(self.alarms.values())
return ret
def transform_entities(self, entities: Collection[Alarm], **_) -> List[AlarmTable]:
return [alarm.to_db() for alarm in entities]
def main(self): def main(self):
self._sync_alarms()
for alarm in self.alarms.values(): for alarm in self.alarms.values():
alarm.start() alarm.start()
while not self.should_stop(): while not self.should_stop():
for name, alarm in self.alarms.copy().items(): self._sync_alarms()
if not alarm.timer or (
not alarm.timer.is_alive() and alarm.state == AlarmState.SHUTDOWN
):
del self.alarms[name]
self.wait_stop(self.poll_interval) self.wait_stop(self.poll_interval)
def stop(self): def stop(self):

View file

@ -3,11 +3,13 @@ import enum
import os import os
import time import time
import threading import threading
from typing import Optional, Union from typing import Callable, Optional, Union
import croniter import croniter
from platypush.context import get_bus, get_plugin from platypush.context import get_bus, get_plugin
from platypush.entities.alarm import Alarm as AlarmTable
from platypush.message.request import Request
from platypush.message.event.alarm import ( from platypush.message.event.alarm import (
AlarmStartedEvent, AlarmStartedEvent,
AlarmDismissedEvent, AlarmDismissedEvent,
@ -49,7 +51,10 @@ class Alarm:
snooze_interval: float = 300, snooze_interval: float = 300,
poll_interval: float = 5, poll_interval: float = 5,
enabled: bool = True, enabled: bool = True,
static: bool = False,
stop_event: Optional[threading.Event] = None, stop_event: Optional[threading.Event] = None,
on_change: Optional[Callable[['Alarm'], None]] = None,
**_,
): ):
with self._id_lock: with self._id_lock:
self._alarms_count += 1 self._alarms_count += 1
@ -63,6 +68,7 @@ class Alarm:
self.snooze_interval = snooze_interval self.snooze_interval = snooze_interval
self.state = AlarmState.UNKNOWN self.state = AlarmState.UNKNOWN
self.timer: Optional[threading.Timer] = None self.timer: Optional[threading.Timer] = None
self.static = static
self.actions = Procedure.build( self.actions = Procedure.build(
name=name, _async=False, requests=actions or [], id=self.id name=name, _async=False, requests=actions or [], id=self.id
) )
@ -71,6 +77,11 @@ class Alarm:
self._runtime_snooze_interval = snooze_interval self._runtime_snooze_interval = snooze_interval
self.stop_event = stop_event or threading.Event() self.stop_event = stop_event or threading.Event()
self.poll_interval = poll_interval self.poll_interval = poll_interval
self.on_change = on_change
def _on_change(self):
if self.on_change:
self.on_change(self)
@staticmethod @staticmethod
def _get_media_resource(media: Optional[str]) -> Optional[str]: def _get_media_resource(media: Optional[str]) -> Optional[str]:
@ -113,16 +124,26 @@ class Alarm:
def is_enabled(self): def is_enabled(self):
return self._enabled return self._enabled
def is_shut_down(self):
return self.state == AlarmState.SHUTDOWN
def is_expired(self):
return (self.get_next() or 0) < time.time()
def disable(self): def disable(self):
self._enabled = False self.set_enabled(False)
def enable(self): def enable(self):
self._enabled = True self.set_enabled(True)
def set_enabled(self, enabled: bool):
self._enabled = enabled
def dismiss(self): def dismiss(self):
self.state = AlarmState.DISMISSED self.state = AlarmState.DISMISSED
self.stop_audio() self.stop_audio()
get_bus().post(AlarmDismissedEvent(name=self.name)) get_bus().post(AlarmDismissedEvent(name=self.name))
self._on_change()
def snooze(self, interval: Optional[float] = None): def snooze(self, interval: Optional[float] = None):
self._runtime_snooze_interval = interval or self.snooze_interval self._runtime_snooze_interval = interval or self.snooze_interval
@ -131,14 +152,12 @@ class Alarm:
get_bus().post( get_bus().post(
AlarmSnoozedEvent(name=self.name, interval=self._runtime_snooze_interval) AlarmSnoozedEvent(name=self.name, interval=self._runtime_snooze_interval)
) )
self._on_change()
def start(self): def start(self):
if self.timer: if self.timer:
self.timer.cancel() self.timer.cancel()
if self.get_next() is None:
return
next_run = self.get_next() next_run = self.get_next()
if next_run is None: if next_run is None:
return return
@ -147,6 +166,7 @@ class Alarm:
self.timer = threading.Timer(interval, self.alarm_callback) self.timer = threading.Timer(interval, self.alarm_callback)
self.timer.start() self.timer.start()
self.state = AlarmState.WAITING self.state = AlarmState.WAITING
self._on_change()
def stop(self): def stop(self):
self.state = AlarmState.SHUTDOWN self.state = AlarmState.SHUTDOWN
@ -154,6 +174,8 @@ class Alarm:
self.timer.cancel() self.timer.cancel()
self.timer = None self.timer = None
self._on_change()
def _get_media_plugin(self) -> MediaPlugin: def _get_media_plugin(self) -> MediaPlugin:
plugin = get_plugin(self.media_plugin) plugin = get_plugin(self.media_plugin)
assert plugin and isinstance(plugin, MediaPlugin), ( assert plugin and isinstance(plugin, MediaPlugin), (
@ -170,7 +192,6 @@ class Alarm:
if self.audio_volume is not None: if self.audio_volume is not None:
self._get_media_plugin().set_volume(self.audio_volume) self._get_media_plugin().set_volume(self.audio_volume)
self.state = AlarmState.RUNNING
audio_thread = threading.Thread(target=thread) audio_thread = threading.Thread(target=thread)
audio_thread.start() audio_thread.start()
@ -180,7 +201,9 @@ class Alarm:
def alarm_callback(self): def alarm_callback(self):
while not self.should_stop(): while not self.should_stop():
if self.is_enabled(): if self.is_enabled():
self.state = AlarmState.RUNNING
get_bus().post(AlarmStartedEvent(name=self.name)) get_bus().post(AlarmStartedEvent(name=self.name))
self._on_change()
if self.media_plugin and self.media: if self.media_plugin and self.media:
self.play_audio() self.play_audio()
@ -226,17 +249,57 @@ class Alarm:
self.stop_event.wait(timeout) self.stop_event.wait(timeout)
def should_stop(self): def should_stop(self):
return self.stop_event.is_set() or self.state == AlarmState.SHUTDOWN return self.stop_event.is_set() or (self.is_expired() and self.is_shut_down())
def to_dict(self): def to_dict(self) -> dict:
return { return {
'id': self.name,
'name': self.name, 'name': self.name,
'id': self.id,
'when': self.when, 'when': self.when,
'next_run': self.get_next(), 'next_run': self.get_next(),
'enabled': self.is_enabled(), 'enabled': self.is_enabled(),
'state': self.state.name, 'state': self.state.name,
'media': self.media,
'media_plugin': self.media_plugin,
'audio_volume': self.audio_volume,
'snooze_interval': self.snooze_interval,
'actions': self.actions.requests,
'static': self.static,
} }
@classmethod
def from_db(cls, alarm: AlarmTable, **kwargs) -> 'Alarm':
return cls(
when=str(alarm.when),
name=str(alarm.name),
media=alarm.media, # type: ignore
media_plugin=kwargs.pop('media_plugin', alarm.media_plugin), # type: ignore
audio_volume=alarm.audio_volume, # type: ignore
actions=alarm.actions, # type: ignore
snooze_interval=alarm.snooze_interval, # type: ignore
enabled=bool(alarm.enabled),
static=bool(alarm.static),
**kwargs,
)
def to_db(self) -> AlarmTable:
return AlarmTable(
id=self.name,
name=self.name,
when=self.when,
state=self.state.name,
next_run=self.get_next(),
media=self.media,
media_plugin=self.media_plugin,
audio_volume=self.audio_volume,
actions=[
Request.to_dict(req) if isinstance(req, Request) else req
for req in self.actions.requests
],
snooze_interval=self.snooze_interval,
enabled=self.is_enabled(),
static=self.static,
)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et: