Resolve "[Feature Request] Gotify Push Intergration"

This commit is contained in:
Fabio Manganiello 2021-10-01 23:50:53 +02:00
parent e3f0219554
commit 04a5480d19
6 changed files with 345 additions and 5 deletions

View File

@ -3,6 +3,12 @@
All notable changes to this project will be documented in this file.
Given the high speed of development in the first phase, changes are being reported only starting from v0.20.2.
## [0.22.3] - 2021-10-01
## Added
- `gotify` integration (see #198).
## [0.22.2] - 2021-09-25
## Added

View File

@ -0,0 +1,44 @@
from typing import Optional
from platypush.message.event import Event
class GotifyEvent(Event):
"""
Gotify base event.
"""
class GotifyMessageEvent(GotifyEvent):
"""
Event triggered when a message is received on the Gotify instance.
"""
def __init__(self, *args,
message: str,
title: Optional[str] = None,
priority: Optional[int] = None,
extras: Optional[dict] = None,
date: Optional[str] = None,
id: Optional[int] = None,
appid: Optional[int] = None,
**kwargs):
"""
:param message: Message body.
:param title: Message title.
:param priority: Message priority.
:param extras: Message extra payload.
:param date: Delivery datetime.
:param id: Message ID.
:param appid: ID of the sender application.
"""
super().__init__(
*args,
message=message,
title=title,
priority=priority,
extras=extras,
date=date,
id=id,
appid=appid,
**kwargs
)

View File

@ -0,0 +1,200 @@
import json
import multiprocessing
from typing import Optional
import requests
import websocket
from platypush.context import get_bus
from platypush.message.event.gotify import GotifyMessageEvent
from platypush.plugins import RunnablePlugin, action
from platypush.schemas.gotify import GotifyMessageSchema
class GotifyPlugin(RunnablePlugin):
"""
Gotify integration.
`Gotify <https://gotify.net>`_ allows you process messages and notifications asynchronously
over your own devices without relying on 3rd-party cloud services.
Triggers:
* :class:`platypush.message.event.gotify.GotifyMessageEvent` when a new message is received.
"""
def __init__(self, server_url: str, app_token: str, client_token: str, **kwargs):
"""
:param server_url: Base URL of the Gotify server (e.g. ``http://localhost``).
:param app_token: Application token, required to send message and retrieve application info.
You can create a new application under ``http://<server_url>/#/applications``.
:param client_token: Client token, required to subscribe to messages.
You can create a new client under ``http://<server_url>/#/clients``.
"""
super().__init__(**kwargs)
self.server_url = server_url
self.app_token = app_token
self.client_token = client_token
self._ws_app: Optional[websocket.WebSocketApp] = None
self._state_lock = multiprocessing.RLock()
self._connected_event = multiprocessing.Event()
self._disconnected_event = multiprocessing.Event()
self._ws_listener: Optional[multiprocessing.Process] = None
def _execute(self, method: str, endpoint: str, **kwargs) -> dict:
method = method.lower()
rs = getattr(requests, method)(
f'{self.server_url}/{endpoint}',
headers={
'X-Gotify-Key': self.app_token if method == 'post' else self.client_token,
'Content-Type': 'application/json',
**kwargs.pop('headers', {}),
},
**kwargs
)
rs.raise_for_status()
try:
return rs.json()
except Exception as e:
self.logger.debug(e)
def main(self):
self._connect()
stop_events = []
while not any(stop_events):
stop_events = self._should_stop.wait(timeout=1), self._disconnected_event.wait(timeout=1)
def stop(self):
if self._ws_app:
self._ws_app.close()
self._ws_app = None
if self._ws_listener and self._ws_listener.is_alive():
self.logger.info('Terminating websocket process')
self._ws_listener.terminate()
self._ws_listener.join(5)
if self._ws_listener and self._ws_listener.is_alive():
self.logger.warning('Terminating the websocket process failed, killing the process')
self._ws_listener.kill()
if self._ws_listener:
self._ws_listener.join()
self._ws_listener = None
super().stop()
def _connect(self):
with self._state_lock:
if self.should_stop() or self._connected_event.is_set():
return
ws_url = '/'.join([self.server_url.split('/')[0].replace('http', 'ws'), *self.server_url.split('/')[1:]])
self._ws_app = websocket.WebSocketApp(
f'{ws_url}/stream?token={self.client_token}',
on_open=self._on_open(),
on_message=self._on_msg(),
on_error=self._on_error(),
on_close=self._on_close()
)
def server():
self._ws_app.run_forever()
self._ws_listener = multiprocessing.Process(target=server)
self._ws_listener.start()
def _on_open(self):
def hndl(*_):
with self._state_lock:
self._disconnected_event.clear()
self._connected_event.set()
self.logger.info('Connected to the Gotify websocket')
return hndl
@staticmethod
def _on_msg():
def hndl(*args):
data = json.loads(args[1] if len(args) > 1 else args[0])
get_bus().post(GotifyMessageEvent(**GotifyMessageSchema().dump(data)))
return hndl
def _on_error(self):
def hndl(*args):
error = args[1] if len(args) > 1 else args[0]
ws = args[0] if len(args) > 1 else None
self.logger.warning('Gotify websocket error: {}'.format(error))
if ws:
ws.close()
return hndl
def _on_close(self):
def hndl(*_):
with self._state_lock:
self._disconnected_event.set()
self._connected_event.clear()
self.logger.warning('Gotify websocket connection closed')
return hndl
@action
def send_message(self, message: str, title: Optional[str] = None, priority: int = 0, extras: Optional[dict] = None):
"""
Send a message to the server.
:param message: Message body (Markdown is supported).
:param title: Message title.
:param priority: Message priority (default: 0).
:param extras: Extra JSON payload to be passed on the message.
:return: .. schema:: gotify.GotifyMessageSchema
"""
return GotifyMessageSchema().dump(
self._execute('post', 'message', json={
'message': message,
'title': title,
'priority': priority,
'extras': extras or {},
})
)
@action
def get_messages(self, limit: int = 100, since: Optional[int] = None):
"""
Get a list of the messages received on the server.
:param limit: Maximum number of messages to retrieve (default: 100).
:param since: Retrieve the message having ``since`` as minimum ID.
:return: .. schema:: gotify.GotifyMessageSchema(many=True)
"""
return GotifyMessageSchema().dump(
self._execute(
'get', 'message', params={
'limit': limit,
**({'since': since} if since else {}),
}
).get('messages', []), many=True
)
@action
def delete_messages(self, *ids):
"""
Delete messages.
:param ids: If specified, it deletes the messages matching these IDs.
Otherwise, it deletes all the received messages.
"""
if not ids:
self._execute('delete', 'message')
return
for id in ids:
self._execute('delete', f'message/{id}')
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,4 @@
manifest:
events: {}
package: platypush.plugins.gotify
type: plugin

View File

@ -1,6 +1,8 @@
from datetime import datetime
from typing import Optional
from typing import Optional, Union
from dateutil.parser import isoparse
from dateutil.tz import tzutc
from marshmallow import fields
@ -15,9 +17,26 @@ class StrippedString(fields.Function): # lgtm [py/missing-call-to-init]
return value.strip()
def normalize_datetime(dt: str) -> Optional[datetime]:
class DateTime(fields.Function):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metadata = {
'example': datetime.now(tz=tzutc()).isoformat(),
**(self.metadata or {}),
}
def _serialize(self, value, attr, obj, **kwargs) -> Optional[str]:
value = normalize_datetime(obj.get(attr))
if value:
return value.isoformat()
def _deserialize(self, value, attr, data, **kwargs) -> Optional[datetime]:
return normalize_datetime(value)
def normalize_datetime(dt: Union[str, datetime]) -> Optional[datetime]:
if not dt:
return
if dt.endswith('Z'):
dt = dt[:-1] + '+00:00'
return datetime.fromisoformat(dt)
if isinstance(dt, datetime):
return dt
return isoparse(dt)

View File

@ -0,0 +1,67 @@
from marshmallow import fields
from marshmallow.schema import Schema
from . import DateTime
class GotifyMessageSchema(Schema):
title = fields.String(
metadata=dict(
description='Message title',
example='Test title',
)
)
message = fields.String(
required=True,
metadata=dict(
description='Message body (markdown is supported)',
example='Test message',
)
)
priority = fields.Int(
missing=0,
metadata=dict(
description='Message priority',
example=2,
)
)
extras = fields.Dict(
metadata=dict(
description='Extra payload to be delivered with the message',
example={
'home::appliances::lighting::on': {
'brightness': 15
},
'home::appliances::thermostat::change_temperature': {
'temperature': 23
}
},
)
)
id = fields.Int(
required=True,
dump_only=True,
metadata=dict(
description='Message ID',
example=1,
)
)
appid = fields.Int(
dump_only=True,
metadata=dict(
description='ID of the app that posted the message',
example=1,
)
)
date = DateTime(
dump_only=True,
metadata=dict(
description='Message date',
)
)