diff --git a/docs/source/events.rst b/docs/source/events.rst index 9d0b63ba..5ca337d9 100644 --- a/docs/source/events.rst +++ b/docs/source/events.rst @@ -49,6 +49,7 @@ Events platypush/events/nextcloud.rst platypush/events/nfc.rst platypush/events/ngrok.rst + platypush/events/ntfy.rst platypush/events/ping.rst platypush/events/pushbullet.rst platypush/events/qrcode.rst diff --git a/docs/source/platypush/events/ntfy.rst b/docs/source/platypush/events/ntfy.rst new file mode 100644 index 00000000..94aa4a64 --- /dev/null +++ b/docs/source/platypush/events/ntfy.rst @@ -0,0 +1,5 @@ +``ntfy`` +======== + +.. automodule:: platypush.message.event.ntfy + :members: diff --git a/docs/source/platypush/plugins/ntfy.rst b/docs/source/platypush/plugins/ntfy.rst new file mode 100644 index 00000000..d8d60190 --- /dev/null +++ b/docs/source/platypush/plugins/ntfy.rst @@ -0,0 +1,5 @@ +``ntfy`` +======== + +.. automodule:: platypush.plugins.ntfy + :members: diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 0d60cbd1..b848e593 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -96,6 +96,7 @@ Plugins platypush/plugins/nextcloud.rst platypush/plugins/ngrok.rst platypush/plugins/nmap.rst + platypush/plugins/ntfy.rst platypush/plugins/otp.rst platypush/plugins/pihole.rst platypush/plugins/ping.rst diff --git a/platypush/message/event/ntfy.py b/platypush/message/event/ntfy.py new file mode 100644 index 00000000..1709cb18 --- /dev/null +++ b/platypush/message/event/ntfy.py @@ -0,0 +1,94 @@ +from typing import Optional, Collection, Mapping + +from platypush.message.event import Event + + +class NotificationEvent(Event): + """ + Event triggered when a message/notification is received on a subscribed + channel. + """ + + def __init__( + self, + *args, + id: str, + topic: str, + message: str, + title: Optional[str] = None, + priority: Optional[int] = None, + time: Optional[int] = None, + attachment: Optional[Mapping] = None, + actions: Optional[Collection[Mapping]] = None, + tags: Optional[Collection[str]] = None, + click_url: Optional[str] = None, + **kwargs + ): + """ + :param id: Message ID. + :param topic: The topic where the message was received. + :param message: Message body. + :param title: Message title. + :param priority: Message priority. + :param time: Message UNIX timestamp. + :param tags: Notification tags. + :param click_url: URL spawned when the notification is clicked. + :param actions: List of actions associated to the notification. + Example: + + .. code-block:: json + + [ + { + "action": "view", + "label": "Open portal", + "url": "https://home.nest.com/", + "clear": true + }, + { + "action": "http", + "label": "Turn down", + "url": "https://api.nest.com/", + "method": "PUT", + "headers": { + "Authorization": "Bearer abcdef..." + }, + "body": "{\\"temperature\\": 65}" + }, + { + "action": "broadcast", + "label": "Take picture", + "intent": "com.myapp.TAKE_PICTURE_INTENT", + "extras": { + "camera": "front" + } + } + ] + + :param attachment: Attachment metadata. Example: + + .. code-block:: json + + { + "name": "image.jpg", + "type": "image/jpeg", + "size": 30017, + "expires": 1654144935, + "url": "https://ntfy.example.com/file/01234abcd.jpg" + } + + """ + super().__init__( + *args, + id=id, + topic=topic, + message=message, + title=title, + priority=priority, + time=time, + tags=tags, + attachment=attachment, + actions=actions, + click_url=click_url, + **kwargs + ) diff --git a/platypush/plugins/ntfy/__init__.py b/platypush/plugins/ntfy/__init__.py new file mode 100644 index 00000000..d197a6b1 --- /dev/null +++ b/platypush/plugins/ntfy/__init__.py @@ -0,0 +1,286 @@ +import asyncio +import json +import multiprocessing +import os +import time +from typing import Optional, Collection, Mapping + +import requests +import websockets + +from platypush.context import get_bus +from platypush.message.event.ntfy import NotificationEvent +from platypush.plugins import RunnablePlugin, action +from platypush.context import get_or_create_event_loop + + +class NtfyPlugin(RunnablePlugin): + """ + Ntfy integration. + + `ntfy `_ allows you to process asynchronous notification + across multiple devices and it's compatible with the + `UnifiedPush ` specification. + + Triggers: + + * :class:`platypush.message.event.ntfy.NotificationEvent` when a new notification is received. + + """ + + def __init__( + self, + server_url: str = 'https://ntfy.sh', + subscriptions: Optional[Collection[str]] = None, + **kwargs, + ): + """ + :param server_url: Default ntfy instance base URL (default: ``https://ntfy.sh``). + :param subscriptions: List of topics the plugin should subscribe to + (default: none). + """ + super().__init__(**kwargs) + self._server_url = server_url + self._ws_url = '/'.join( + [ + self._server_url.split('/')[0].replace('http', 'ws'), + *self._server_url.split('/')[1:], + ] + ) + + self._event_loop: Optional[asyncio.AbstractEventLoop] = None + self._subscriptions = subscriptions or [] + self._ws_proc = None + + def _connect(self): + if self.should_stop(): + self.logger.debug('Already connected') + return + + self._ws_proc = multiprocessing.Process(target=self._ws_process) + self._ws_proc.start() + + while not self._should_stop.is_set(): + self._should_stop.wait(timeout=1) + + async def _get_ws_handler(self, url): + reconnect_wait_secs = 1 + reconnect_wait_secs_max = 60 + + while True: + self.logger.debug(f'Connecting to {url}') + + try: + async with websockets.connect(url) as ws: + reconnect_wait_secs = 1 + self.logger.info(f'Connected to {url}') + async for msg in ws: + try: + msg = json.loads(msg) + except json.JSONDecodeError as e: + self.logger.warning( + 'Received invalid JSON message from the server: %s\n%s', + e, + msg, + ) + continue + + self.logger.debug('Received message on ntfy: %s', msg) + if msg.get('event') == 'message': + get_bus().post( + NotificationEvent( + id=msg['id'], + time=msg['time'], + topic=msg['topic'], + message=msg.get('message'), + title=msg.get('title'), + tags=msg.get('tags'), + click_url=msg.get('click'), + actions=msg.get('actions'), + attachment=msg.get('attachment'), + ) + ) + except websockets.exceptions.WebSocketException as e: + self.logger.error('Websocket error: %s', e) + time.sleep(reconnect_wait_secs) + reconnect_wait_secs = min( + reconnect_wait_secs * 2, reconnect_wait_secs_max + ) + + async def _ws_processor(self, urls): + await asyncio.wait([self._get_ws_handler(url) for url in urls]) + + def _ws_process(self): + self._event_loop = get_or_create_event_loop() + try: + self._event_loop.run_until_complete( + self._ws_processor( + {f'{self._ws_url}/{sub}/ws' for sub in self._subscriptions} + ) + ) + except KeyboardInterrupt: + pass + + def main(self): + if self._subscriptions: + self._connect() + + def stop(self): + if self._ws_proc: + self._ws_proc.kill() + self._ws_proc.join() + self._ws_proc = None + + super().stop() + + @action + def send_message( + self, + topic: str, + message: str = '', + server_url: Optional[str] = None, + username: Optional[str] = None, + password: Optional[str] = None, + title: Optional[str] = None, + click_url: Optional[str] = None, + attachment: Optional[str] = None, + filename: Optional[str] = None, + actions: Optional[Collection[Mapping[str, str]]] = None, + email: Optional[str] = None, + priority: Optional[str] = None, + tags: Optional[Collection[str]] = None, + schedule: Optional[str] = None, + ): + """ + Send a message/notification to a topic. + + :param topic: Topic where the message will be delivered. + :param message: Text of the message to be sent. + :param server_url: Override the default server URL. + :param username: Set if publishing to the topic requires authentication + :param password: Set if publishing to the topic requires authentication + :param title: Custom notification title. + :param click_url: URL that should be opened when the user clicks the + notification. It can be an ``http(s)://`` URL, a ``mailto:`, a + ``geo:``, a link to another ntfy topic (e.g. ``ntfy://mytopic``) or + a Twitter link (e.g. ``twitter://user?screen_name=myname``). + :param attachment: Attach a file or URL to the notification. It can + either be an HTTP URL or a path to a local file. + :param filename: If ``attachment`` is specified, you can override the + output filename (default: same filename as the URL/path base name). + :param actions: List of objects describing possible action buttons + available for the notification. Supported types: + + - ``view``: Open a URL or an app when the action button is + clicked + - ``http``: Send an HTTP request upon action selection. + - ``broadcast``: Send an `Android broadcast ` + intent upon action selection (only available on Android). + + Example: + + .. code-block:: json + + [ + { + "action": "view", + "label": "Open portal", + "url": "https://home.nest.com/", + "clear": true + }, + { + "action": "http", + "label": "Turn down", + "url": "https://api.nest.com/", + "method": "PUT", + "headers": { + "Authorization": "Bearer abcdef..." + }, + "body": "{\\"temperature\\": 65}" + }, + { + "action": "broadcast", + "label": "Take picture", + "intent": "com.myapp.TAKE_PICTURE_INTENT", + "extras": { + "camera": "front" + } + } + ] + + :param email: Forward the notification as an email to the specified + address. + :param priority: Custom notification priority. Supported values: + ``[max, high, default, low, min]``. + :param tags: Optional list of tags associated with the notification. + Tag names that match emoji short codes will be rendered as emojis + in the notification - see `here ` for + a list of supported emojis. + :param schedule: Schedule the message to be delivered at a specific + time (for example, for reminders). Supported formats: + + - UNIX timestamps + - Duration (e.g. ``30m``, ``3h``, ``2 days``) + - Natural language strings (e.g. ``Tuesday, 7am`` or + ``tomorrow, 3pm``) + + """ + method = requests.post + url = server_url or self._server_url + args = {} + if username and password: + args['auth'] = (username, password) + + if attachment and not ( + attachment.startswith('http://') or attachment.startswith('https://') + ): + url = f'{url}/{topic}' + attachment = os.path.expanduser(attachment) + filename = filename or os.path.basename(attachment) + args['headers'] = { + 'Filename': filename, + **({'X-Title': title} if title else {}), + **({'X-Click': click_url} if click_url else {}), + **({'X-Email': email} if email else {}), + **({'X-Priority': priority} if priority else {}), + **({'X-Tags': ','.join(tags)} if tags else {}), + **({'X-Delay': schedule} if schedule else {}), + } + + with open(attachment, 'rb') as f: + args['data'] = f.read() + method = requests.put + else: + args['json'] = { + 'topic': topic, + 'message': message, + **({'title': title} if title else {}), + **({'click': click_url} if click_url else {}), + **({'email': email} if email else {}), + **({'priority': priority} if priority else {}), + **({'tags': tags} if tags else {}), + **({'delay': schedule} if schedule else {}), + **({'actions': actions} if actions else {}), + **( + { + 'attach': attachment, + 'filename': ( + filename + if filename + else attachment.split('/')[-1].split('?')[0] + ), + } + if attachment + else {} + ), + } + + rs = method(url, **args) + assert rs.ok, 'Could not send message to {}: {}'.format( + topic, rs.json().get('error', f'HTTP error: {rs.status_code}') + ) + + return rs.json() + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/ntfy/manifest.yaml b/platypush/plugins/ntfy/manifest.yaml new file mode 100644 index 00000000..1317dc7e --- /dev/null +++ b/platypush/plugins/ntfy/manifest.yaml @@ -0,0 +1,5 @@ +manifest: + events: + platypush.message.event.ntfy.NotificationEvent: when a notification is received. + package: platypush.plugins.ntfy + type: plugin diff --git a/setup.cfg b/setup.cfg index 1dd336c2..b26b3223 100644 --- a/setup.cfg +++ b/setup.cfg @@ -8,3 +8,6 @@ description-file = README.md [flake8] max-line-length = 120 +ignore = + SIM105 + W503