platypush/platypush/plugins/alarm/_model.py

430 lines
13 KiB
Python

import datetime
import enum
import os
import time
import threading
from random import randint
from typing import Callable, Optional, Union
import croniter
from dateutil.parser import isoparse
from dateutil.tz import gettz
from platypush.context import get_bus, get_plugin
from platypush.entities.alarm import Alarm as AlarmDb
from platypush.message.request import Request
from platypush.message.event.alarm import (
AlarmDisabledEvent,
AlarmDismissedEvent,
AlarmEnabledEvent,
AlarmSnoozedEvent,
AlarmStartedEvent,
)
from platypush.plugins.media import MediaPlugin, PlayerState
from platypush.procedure import Procedure
class AlarmState(enum.IntEnum):
"""
Alarm states.
"""
WAITING = 1
RUNNING = 2
DISMISSED = 3
SNOOZED = 4
SHUTDOWN = 5
UNKNOWN = -1
class AlarmConditionType(enum.Enum):
"""
Alarm condition types.
"""
CRON = 'cron'
INTERVAL = 'interval'
TIMESTAMP = 'timestamp'
class Alarm:
"""
Alarm model and controller.
"""
def __init__(
self,
when: Union[str, int, float],
actions: Optional[Union[list, Procedure]] = None,
name: Optional[str] = None,
media: Optional[str] = None,
media_plugin: Optional[str] = None,
media_repeat: bool = True,
audio_volume: Optional[Union[int, float]] = None,
snooze_interval: float = 300,
dismiss_interval: float = 300,
poll_interval: float = 2,
enabled: bool = True,
static: bool = False,
stop_event: Optional[threading.Event] = None,
on_change: Optional[Callable[['Alarm'], None]] = None,
**_,
):
self.id = randint(0, 65535)
self.when = when
self.name = name or f'Alarm_{self.id}'
self.media = self._get_media_resource(media)
self.media_plugin = media_plugin
self.media_repeat = media_repeat
self.audio_volume = audio_volume
self.snooze_interval = snooze_interval
self.dismiss_interval = dismiss_interval
self.state = AlarmState.UNKNOWN
self.timer: Optional[threading.Timer] = None
self.static = static
self.actions = (
actions
if isinstance(actions, Procedure)
else Procedure.build(
name=name, _async=False, requests=actions or [], id=self.id
)
)
self._enabled = enabled
self._runtime_snooze_interval = snooze_interval
self.stop_event = stop_event or threading.Event()
self.poll_interval = poll_interval
self.on_change = on_change
self._dismiss_timer: Optional[threading.Timer] = None
def _on_change(self):
if self.on_change:
self.on_change(self)
@staticmethod
def _get_media_resource(media: Optional[str]) -> Optional[str]:
if not media:
return None
if media.startswith('file://'):
media = media[len('file://') :]
media_path = os.path.abspath(os.path.expanduser(media))
if os.path.isfile(media_path):
media = media_path
return media
@property
def is_cron(self) -> bool:
if not isinstance(self.when, str):
return False
try:
croniter.croniter(self.when, time.time()).get_next()
return True
except (AttributeError, croniter.CroniterBadCronError):
return False
@property
def is_interval(self) -> bool:
try:
float(self.when)
return True
except (TypeError, ValueError):
return False
@property
def is_timestamp(self) -> bool:
if not isinstance(self.when, str):
return False
try:
datetime.datetime.fromisoformat(self.when)
return True
except Exception:
return False
@property
def condition_type(self) -> AlarmConditionType:
if self.is_cron:
return AlarmConditionType.CRON
if self.is_interval:
return AlarmConditionType.INTERVAL
if self.is_timestamp:
return AlarmConditionType.TIMESTAMP
raise ValueError(f'Invalid alarm condition {self.when}')
def get_next(self) -> Optional[float]:
now = time.time()
t = 0
try:
# If when is a number, interpret it as number of seconds into the future
delta = float(self.when)
t = now + delta
self.when = datetime.datetime.fromtimestamp(t).isoformat()
except (TypeError, ValueError):
assert isinstance(self.when, str), f'Invalid alarm time {self.when}'
try:
# If when is a cron expression, get the next run time
t = croniter.croniter(
self.when,
datetime.datetime.fromtimestamp(now).replace(tzinfo=gettz()),
).get_next()
except (AttributeError, croniter.CroniterBadCronError):
try:
# If when is an ISO-8601 timestamp, parse it
t = isoparse(self.when).timestamp()
except Exception as e:
raise AssertionError(f'Invalid alarm time {self.when}: {e}') from e
return t if t >= now else None
def is_enabled(self):
return self._enabled
def is_shut_down(self):
return self.state in {
AlarmState.SHUTDOWN,
AlarmState.DISMISSED,
AlarmState.UNKNOWN,
}
def is_expired(self):
return (self.get_next() or 0) < time.time()
def disable(self):
self.set_enabled(False)
def enable(self):
self.set_enabled(True)
def set_enabled(self, enabled: bool):
if enabled == self._enabled:
return
self._enabled = enabled
evt_type = AlarmEnabledEvent if enabled else AlarmDisabledEvent
get_bus().post(evt_type(name=self.name))
self._on_change()
def dismiss(self):
self.state = AlarmState.DISMISSED
self.stop_audio()
self._clear_dismiss_timer()
get_bus().post(AlarmDismissedEvent(name=self.name))
self._on_change()
def snooze(self, interval: Optional[float] = None):
self._runtime_snooze_interval = interval or self.snooze_interval
self.state = AlarmState.SNOOZED
self.stop_audio()
self._clear_dismiss_timer()
get_bus().post(
AlarmSnoozedEvent(name=self.name, interval=self._runtime_snooze_interval)
)
self._on_change()
def start(self):
if self.timer:
self.timer.cancel()
next_run = self.get_next()
if next_run is None:
return
interval = next_run - time.time()
self.state = AlarmState.WAITING
self.timer = threading.Timer(interval, self.alarm_callback)
self.timer.start()
self._clear_dismiss_timer()
self._on_change()
def stop(self):
self.state = AlarmState.SHUTDOWN
self.stop_audio()
if self.timer:
self.timer.cancel()
self.timer = None
self._on_change()
def _clear_dismiss_timer(self):
if self._dismiss_timer:
self._dismiss_timer.cancel()
self._dismiss_timer = None
def _get_media_plugin(self) -> MediaPlugin:
plugin = get_plugin(self.media_plugin)
assert plugin and isinstance(plugin, MediaPlugin), (
f'Invalid audio plugin {self.media_plugin}'
if plugin
else f'Missing audio plugin {self.media_plugin}'
)
return plugin
def play_audio(self):
def thread():
self._get_media_plugin().play(self.media)
if self.audio_volume is not None:
self._get_media_plugin().set_volume(self.audio_volume)
audio_thread = threading.Thread(target=thread)
audio_thread.start()
def stop_audio(self):
self._get_media_plugin().stop()
def _on_start(self):
if self.state != AlarmState.RUNNING:
self._dismiss_timer = threading.Timer(self.dismiss_interval, self.dismiss)
self._dismiss_timer.start()
self.state = AlarmState.RUNNING
get_bus().post(AlarmStartedEvent(name=self.name))
self._on_change()
if self.media_plugin and self.media:
self.play_audio()
self.actions.execute()
def _on_running(self):
sleep_time = None
while not self.should_stop():
plugin_status = self._get_media_plugin().status().output
if not isinstance(plugin_status, dict):
self.wait_stop(self.poll_interval)
continue
state = plugin_status.get('state')
if state == PlayerState.STOP.value:
if self.state == AlarmState.SNOOZED:
sleep_time = self._runtime_snooze_interval
else:
if (
self.media_repeat
and self.state != AlarmState.DISMISSED
and not self.should_stop()
):
self.wait_stop(self.poll_interval)
if not self.should_stop():
self.play_audio()
continue
self.state = AlarmState.WAITING
break
self._on_change()
self.wait_stop(self.poll_interval)
return sleep_time
def alarm_callback(self):
while not self.should_stop():
if self.is_enabled():
self._on_start()
elif self.state != AlarmState.WAITING:
self.state = AlarmState.WAITING
self._on_change()
self.wait_stop(self.poll_interval)
sleep_time = None
if self.state == AlarmState.RUNNING:
sleep_time = self._on_running()
if self.state == AlarmState.SNOOZED:
sleep_time = self._runtime_snooze_interval
elif self.get_next() is None:
self.state = AlarmState.SHUTDOWN
break
if not sleep_time:
next_run = self.get_next()
sleep_time = (
next_run - time.time()
if next_run is not None
else self.poll_interval
)
self.stop_event.wait(sleep_time)
def wait_stop(self, timeout: Optional[float] = None):
self.stop_event.wait(timeout)
def should_stop(self):
return (
self.stop_event.is_set()
or (self.is_expired() and self.state == AlarmState.DISMISSED)
or self.state == AlarmState.SHUTDOWN
)
def to_dict(self) -> dict:
return {
'id': self.name,
'type': 'alarm',
'name': self.name,
'when': self.when,
'next_run': self.get_next(),
'enabled': self.is_enabled(),
'state': self.state.name,
'media': self.media,
'media_plugin': self.media_plugin,
'media_repeat': self.media_repeat,
'audio_volume': self.audio_volume,
'snooze_interval': self.snooze_interval,
'dismiss_interval': self.dismiss_interval,
'actions': self.actions.requests,
'static': self.static,
'condition_type': self.condition_type.value,
}
@classmethod
def from_db(cls, alarm: AlarmDb, **kwargs) -> 'Alarm':
return cls(
when=str(alarm.when),
name=str(alarm.name),
media=alarm.media, # type: ignore
media_plugin=kwargs.pop('media_plugin', alarm.media_plugin), # type: ignore
media_repeat=alarm.media_repeat, # type: ignore
audio_volume=alarm.audio_volume, # type: ignore
actions=alarm.actions, # type: ignore
snooze_interval=alarm.snooze_interval, # type: ignore
dismiss_interval=alarm.dismiss_interval, # type: ignore
enabled=bool(alarm.enabled),
static=bool(alarm.static),
state=getattr(AlarmState, str(alarm.state)),
**kwargs,
)
def to_db(self) -> AlarmDb:
return AlarmDb(
id=self.name,
name=self.name,
when=self.when,
state=self.state.name,
next_run=self.get_next(),
media=self.media,
media_plugin=self.media_plugin,
media_repeat=self.media_repeat,
audio_volume=self.audio_volume,
actions=[
Request.to_dict(req) if isinstance(req, Request) else req
for req in self.actions.requests
],
snooze_interval=self.snooze_interval,
dismiss_interval=self.dismiss_interval,
enabled=self.is_enabled(),
static=self.static,
condition_type=self.condition_type.value,
)
# vim:sw=4:ts=4:et: