diff --git a/platypush/entities/alarm.py b/platypush/entities/alarm.py new file mode 100644 index 0000000000..fe68ed0d7d --- /dev/null +++ b/platypush/entities/alarm.py @@ -0,0 +1,35 @@ +from sqlalchemy import Boolean, Column, Float, ForeignKey, Integer, JSON, String + +from platypush.common.db import is_defined + +from . import Entity + + +if not is_defined('alarm'): + + class Alarm(Entity): + """ + Alarm entity model. + """ + + __tablename__ = 'alarm' + + id = Column( + Integer, ForeignKey(Entity.id, ondelete='CASCADE'), primary_key=True + ) + + when = Column(String, nullable=False) + next_run = Column(Float, nullable=True) + enabled = Column(Boolean, nullable=False, default=True) + state = Column(String, nullable=False, default='UNKNOWN') + media = Column(String, nullable=True) + media_plugin = Column(String, nullable=True) + audio_volume = Column(Integer, nullable=True) + snooze_interval = Column(Integer, nullable=True) + actions = Column(JSON, nullable=True) + static = Column(Boolean, nullable=False, default=False) + + __table_args__ = {'extend_existing': True} + __mapper_args__ = { + 'polymorphic_identity': __tablename__, + } diff --git a/platypush/plugins/alarm/__init__.py b/platypush/plugins/alarm/__init__.py index a2045b5730..b9202b4c38 100644 --- a/platypush/plugins/alarm/__init__.py +++ b/platypush/plugins/alarm/__init__.py @@ -1,8 +1,15 @@ +from contextlib import contextmanager import sys -from typing import Optional, Dict, Any, List, Union -from platypush.context import get_plugin +from threading import RLock +from typing import Collection, Generator, Optional, Dict, Any, List, Union +from sqlalchemy.orm import Session + +from platypush.context import get_plugin +from platypush.entities import EntityManager +from platypush.entities.alarm import Alarm as AlarmTable from platypush.plugins import RunnablePlugin, action +from platypush.plugins.db import DbPlugin from platypush.plugins.media import MediaPlugin from platypush.utils import get_plugin_name_by_class from platypush.utils.media import get_default_media_plugin @@ -10,7 +17,7 @@ from platypush.utils.media import get_default_media_plugin from ._model import Alarm, AlarmState -class AlarmPlugin(RunnablePlugin): +class AlarmPlugin(RunnablePlugin, EntityManager): """ Alarm/timer plugin. @@ -84,6 +91,7 @@ class AlarmPlugin(RunnablePlugin): alarms that are configured to play an audio resource. """ super().__init__(poll_interval=poll_interval, **kwargs) + self._db_lock = RLock() alarms = alarms or [] if isinstance(alarms, dict): alarms = [{'name': name, **alarm} for name, alarm in alarms.items()] @@ -106,15 +114,91 @@ class AlarmPlugin(RunnablePlugin): 'No media plugin configured. Alarms that require audio playback will not work' ) - alarms = [ - Alarm( - stop_event=self._should_stop, - **{'media_plugin': self.media_plugin, **alarm}, - ) - for alarm in alarms + self.alarms = { + alarm.name: alarm + for alarm in [ + Alarm( + stop_event=self._should_stop, + static=True, + media_plugin=alarm.pop('media_plugin', self.media_plugin), + on_change=self._on_alarm_update, + **alarm, + ) + for alarm in alarms + ] + } + + self._synced = False + + @property + def _db(self) -> DbPlugin: + db = get_plugin('db') + assert db, 'No database plugin configured' + return db + + @contextmanager + def _get_session(self) -> Generator[Session, None, None]: + with self._db_lock, self._db.get_session() as session: + yield session + + def _merge_alarms(self, alarms: Dict[str, AlarmTable], session: Session): + for name, alarm in alarms.items(): + if name in self.alarms: + existing_alarm = self.alarms[name] + + # If the alarm is static, then we only want to override its + # enabled state from the db record + if existing_alarm.static: + existing_alarm.set_enabled(bool(alarm.enabled)) + else: + # If the alarm record on the db is static, but the alarm is no + # longer present in the configuration, then we want to delete it + if alarm.static: + session.delete(alarm) + else: + self.alarms[name] = Alarm.from_db( + alarm, + stop_event=self._should_stop, + media_plugin=self.media_plugin, + ) + + def _sync_alarms(self): + with self._get_session() as session: + db_alarms = { + str(alarm.name): alarm for alarm in session.query(AlarmTable).all() + } + + self._merge_alarms(db_alarms, session) + self._clear_expired_alarms(session) + for name, alarm in self.alarms.copy().items(): + if not (name in db_alarms or alarm.static): + self.alarms.pop(name, None) + + if not self._synced: + self.publish_entities(self.alarms.values()) + self._synced = True + + def _clear_expired_alarms(self, session: Session): + expired_alarms = [ + alarm + for alarm in self.alarms.values() + if alarm.is_expired() and alarm.is_shut_down() ] - self.alarms: Dict[str, Alarm] = {alarm.name: alarm for alarm in alarms} + if not expired_alarms: + return + + expired_alarm_records = session.query(AlarmTable).filter( + AlarmTable.name.in_([alarm.name for alarm in expired_alarms]) + ) + + for alarm in expired_alarms: + self.alarms.pop(alarm.name, None) + if alarm.static: + continue + + for alarm in expired_alarm_records: + session.delete(alarm) def _get_alarms(self) -> List[Alarm]: return sorted( @@ -142,6 +226,10 @@ class AlarmPlugin(RunnablePlugin): def _disable(self, name: str): self._get_alarm(name).disable() + def _on_alarm_update(self, alarm: Alarm): + with self._db_lock: + self.publish_entities([alarm]) + def _add( self, when: Union[str, int, float], @@ -161,15 +249,22 @@ class AlarmPlugin(RunnablePlugin): media_plugin=self.media_plugin, audio_volume=audio_volume, stop_event=self._should_stop, + on_change=self._on_alarm_update, ) if alarm.name in self.alarms: + assert not self.alarms[alarm.name].static, ( + f'Alarm {alarm.name} is statically defined in the configuration, ' + 'cannot overwrite it programmatically' + ) + self.logger.info('Overwriting existing alarm: %s', alarm.name) self.alarms[alarm.name].stop() self.alarms[alarm.name] = alarm - self.alarms[alarm.name].start() - return self.alarms[alarm.name] + alarm.start() + self.publish_entities([alarm]) + return alarm def _dismiss(self): alarm = self._get_current_alarm() @@ -197,13 +292,9 @@ class AlarmPlugin(RunnablePlugin): audio_file: Optional[str] = None, audio_volume: Optional[Union[int, float]] = None, enabled: bool = True, - ) -> str: + ) -> dict: """ - Add a new alarm. NOTE: alarms that aren't statically defined in the - plugin configuration will only run in the current session. If you want - an alarm to be permanently stored, you should configure it in the alarm - backend configuration. You may want to add an alarm dynamically if it's - a one-time alarm instead. + Add a new alarm. :param when: When the alarm should be executed. It can be either a cron expression (for recurrent alarms), or a datetime string in ISO @@ -215,14 +306,14 @@ class AlarmPlugin(RunnablePlugin): :param media: Path of the audio file to be played. :param audio_volume: Volume of the audio. :param enabled: Whether the new alarm should be enabled (default: True). - :return: The alarm name. + :return: The newly created alarm. """ if audio_file: self.logger.warning( 'The audio_file parameter is deprecated. Use media instead' ) - alarm = self._add( + return self._add( when=when, media=media, audio_file=audio_file, @@ -230,8 +321,7 @@ class AlarmPlugin(RunnablePlugin): name=name, enabled=enabled, audio_volume=audio_volume, - ) - return alarm.name + ).to_dict() @action def enable(self, name: str): @@ -278,7 +368,7 @@ class AlarmPlugin(RunnablePlugin): return self.status() # type: ignore @action - def status(self) -> List[Dict[str, Any]]: + def status(self, *_, **__) -> List[Dict[str, Any]]: """ Get the list of configured alarms and their status. @@ -293,24 +383,40 @@ class AlarmPlugin(RunnablePlugin): "when": "0 8 * * 1-5", "next_run": "2023-12-06T08:00:00.000000", "enabled": true, + "media": "/path/to/media.mp3", + "media_plugin": "media.vlc", + "audio_volume": 10, + "snooze_interval": 300, + "actions": [ + { + "action": "tts.say", + "args": { + "text": "Good morning" + } + }, + { + "action": "light.hue.on" + } + ], "state": "RUNNING" } ] """ - return [alarm.to_dict() for alarm in self._get_alarms()] + ret = [alarm.to_dict() for alarm in self._get_alarms()] + self.publish_entities(self.alarms.values()) + return ret + + def transform_entities(self, entities: Collection[Alarm], **_) -> List[AlarmTable]: + return [alarm.to_db() for alarm in entities] def main(self): + self._sync_alarms() for alarm in self.alarms.values(): alarm.start() while not self.should_stop(): - for name, alarm in self.alarms.copy().items(): - if not alarm.timer or ( - not alarm.timer.is_alive() and alarm.state == AlarmState.SHUTDOWN - ): - del self.alarms[name] - + self._sync_alarms() self.wait_stop(self.poll_interval) def stop(self): diff --git a/platypush/plugins/alarm/_model.py b/platypush/plugins/alarm/_model.py index e7833e65de..369e501a64 100644 --- a/platypush/plugins/alarm/_model.py +++ b/platypush/plugins/alarm/_model.py @@ -3,11 +3,13 @@ import enum import os import time import threading -from typing import Optional, Union +from typing import Callable, Optional, Union import croniter from platypush.context import get_bus, get_plugin +from platypush.entities.alarm import Alarm as AlarmTable +from platypush.message.request import Request from platypush.message.event.alarm import ( AlarmStartedEvent, AlarmDismissedEvent, @@ -49,7 +51,10 @@ class Alarm: snooze_interval: float = 300, poll_interval: float = 5, enabled: bool = True, + static: bool = False, stop_event: Optional[threading.Event] = None, + on_change: Optional[Callable[['Alarm'], None]] = None, + **_, ): with self._id_lock: self._alarms_count += 1 @@ -63,6 +68,7 @@ class Alarm: self.snooze_interval = snooze_interval self.state = AlarmState.UNKNOWN self.timer: Optional[threading.Timer] = None + self.static = static self.actions = Procedure.build( name=name, _async=False, requests=actions or [], id=self.id ) @@ -71,6 +77,11 @@ class Alarm: self._runtime_snooze_interval = snooze_interval self.stop_event = stop_event or threading.Event() self.poll_interval = poll_interval + self.on_change = on_change + + def _on_change(self): + if self.on_change: + self.on_change(self) @staticmethod def _get_media_resource(media: Optional[str]) -> Optional[str]: @@ -113,16 +124,26 @@ class Alarm: def is_enabled(self): return self._enabled + def is_shut_down(self): + return self.state == AlarmState.SHUTDOWN + + def is_expired(self): + return (self.get_next() or 0) < time.time() + def disable(self): - self._enabled = False + self.set_enabled(False) def enable(self): - self._enabled = True + self.set_enabled(True) + + def set_enabled(self, enabled: bool): + self._enabled = enabled def dismiss(self): self.state = AlarmState.DISMISSED self.stop_audio() get_bus().post(AlarmDismissedEvent(name=self.name)) + self._on_change() def snooze(self, interval: Optional[float] = None): self._runtime_snooze_interval = interval or self.snooze_interval @@ -131,14 +152,12 @@ class Alarm: get_bus().post( AlarmSnoozedEvent(name=self.name, interval=self._runtime_snooze_interval) ) + self._on_change() def start(self): if self.timer: self.timer.cancel() - if self.get_next() is None: - return - next_run = self.get_next() if next_run is None: return @@ -147,6 +166,7 @@ class Alarm: self.timer = threading.Timer(interval, self.alarm_callback) self.timer.start() self.state = AlarmState.WAITING + self._on_change() def stop(self): self.state = AlarmState.SHUTDOWN @@ -154,6 +174,8 @@ class Alarm: self.timer.cancel() self.timer = None + self._on_change() + def _get_media_plugin(self) -> MediaPlugin: plugin = get_plugin(self.media_plugin) assert plugin and isinstance(plugin, MediaPlugin), ( @@ -170,7 +192,6 @@ class Alarm: if self.audio_volume is not None: self._get_media_plugin().set_volume(self.audio_volume) - self.state = AlarmState.RUNNING audio_thread = threading.Thread(target=thread) audio_thread.start() @@ -180,7 +201,9 @@ class Alarm: def alarm_callback(self): while not self.should_stop(): if self.is_enabled(): + self.state = AlarmState.RUNNING get_bus().post(AlarmStartedEvent(name=self.name)) + self._on_change() if self.media_plugin and self.media: self.play_audio() @@ -226,17 +249,57 @@ class Alarm: self.stop_event.wait(timeout) def should_stop(self): - return self.stop_event.is_set() or self.state == AlarmState.SHUTDOWN + return self.stop_event.is_set() or (self.is_expired() and self.is_shut_down()) - def to_dict(self): + def to_dict(self) -> dict: return { + 'id': self.name, 'name': self.name, - 'id': self.id, 'when': self.when, 'next_run': self.get_next(), 'enabled': self.is_enabled(), 'state': self.state.name, + 'media': self.media, + 'media_plugin': self.media_plugin, + 'audio_volume': self.audio_volume, + 'snooze_interval': self.snooze_interval, + 'actions': self.actions.requests, + 'static': self.static, } + @classmethod + def from_db(cls, alarm: AlarmTable, **kwargs) -> 'Alarm': + return cls( + when=str(alarm.when), + name=str(alarm.name), + media=alarm.media, # type: ignore + media_plugin=kwargs.pop('media_plugin', alarm.media_plugin), # type: ignore + audio_volume=alarm.audio_volume, # type: ignore + actions=alarm.actions, # type: ignore + snooze_interval=alarm.snooze_interval, # type: ignore + enabled=bool(alarm.enabled), + static=bool(alarm.static), + **kwargs, + ) + + def to_db(self) -> AlarmTable: + return AlarmTable( + id=self.name, + name=self.name, + when=self.when, + state=self.state.name, + next_run=self.get_next(), + media=self.media, + media_plugin=self.media_plugin, + audio_volume=self.audio_volume, + actions=[ + Request.to_dict(req) if isinstance(req, Request) else req + for req in self.actions.requests + ], + snooze_interval=self.snooze_interval, + enabled=self.is_enabled(), + static=self.static, + ) + # vim:sw=4:ts=4:et: