diff --git a/docs/source/conf.py b/docs/source/conf.py index a3f641d2a5..9bbdb852a7 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -138,15 +138,12 @@ latex_elements = { # The paper size ('letterpaper' or 'a4paper'). # # 'papersize': 'letterpaper', - # The font size ('10pt', '11pt' or '12pt'). # # 'pointsize': '10pt', - # Additional stuff for the LaTeX preamble. # # 'preamble': '', - # Latex figure (float) alignment # # 'figure_align': 'htbp', @@ -156,8 +153,7 @@ latex_elements = { # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - (master_doc, 'platypush.tex', 'platypush Documentation', - 'BlackLight', 'manual'), + (master_doc, 'platypush.tex', 'platypush Documentation', 'BlackLight', 'manual'), ] @@ -165,10 +161,7 @@ latex_documents = [ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). -man_pages = [ - (master_doc, 'platypush', 'platypush Documentation', - [author], 1) -] +man_pages = [(master_doc, 'platypush', 'platypush Documentation', [author], 1)] # -- Options for Texinfo output ---------------------------------------------- @@ -177,9 +170,15 @@ man_pages = [ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - (master_doc, 'platypush', 'platypush Documentation', - author, 'platypush', 'One line description of project.', - 'Miscellaneous'), + ( + master_doc, + 'platypush', + 'platypush Documentation', + author, + 'platypush', + 'One line description of project.', + 'Miscellaneous', + ), ] @@ -199,99 +198,103 @@ autodoc_default_options = { 'inherited-members': True, } -autodoc_mock_imports = ['googlesamples.assistant.grpc.audio_helpers', - 'google.assistant.embedded', - 'google.assistant.library', - 'google.assistant.library.event', - 'google.assistant.library.file_helpers', - 'google.oauth2.credentials', - 'oauth2client', - 'apiclient', - 'tenacity', - 'smartcard', - 'Leap', - 'oauth2client', - 'rtmidi', - 'bluetooth', - 'gevent.wsgi', - 'Adafruit_IO', - 'pyperclip', - 'pydbus', - 'inputs', - 'inotify', - 'omxplayer', - 'plexapi', - 'cwiid', - 'sounddevice', - 'soundfile', - 'numpy', - 'cv2', - 'nfc', - 'ndef', - 'bcrypt', - 'google', - 'feedparser', - 'kafka', - 'googlesamples', - 'icalendar', - 'httplib2', - 'mpd', - 'serial', - 'pyHS100', - 'grpc', - 'envirophat', - 'gps', - 'picamera', - 'pmw3901', - 'PIL', - 'croniter', - 'pyaudio', - 'avs', - 'PyOBEX', - 'todoist', - 'trello', - 'telegram', - 'telegram.ext', - 'pyfirmata2', - 'cups', - 'graphyte', - 'cpuinfo', - 'psutil', - 'openzwave', - 'deepspeech', - 'wave', - 'pvporcupine ', - 'pvcheetah', - 'pyotp', - 'linode_api4', - 'pyzbar', - 'tensorflow', - 'keras', - 'pandas', - 'samsungtvws', - 'paramiko', - 'luma', - 'zeroconf', - 'dbus', - 'gi', - 'gi.repository', - 'twilio', - 'Adafruit_Python_DHT', - 'RPi.GPIO', - 'RPLCD', - 'imapclient', - 'pysmartthings', - 'aiohttp', - 'watchdog', - 'pyngrok', - 'irc', - 'irc.bot', - 'irc.strings', - 'irc.client', - 'irc.connection', - 'irc.events', - 'defusedxml', - ] +autodoc_mock_imports = [ + 'googlesamples.assistant.grpc.audio_helpers', + 'google.assistant.embedded', + 'google.assistant.library', + 'google.assistant.library.event', + 'google.assistant.library.file_helpers', + 'google.oauth2.credentials', + 'oauth2client', + 'apiclient', + 'tenacity', + 'smartcard', + 'Leap', + 'oauth2client', + 'rtmidi', + 'bluetooth', + 'gevent.wsgi', + 'Adafruit_IO', + 'pyperclip', + 'pydbus', + 'inputs', + 'inotify', + 'omxplayer', + 'plexapi', + 'cwiid', + 'sounddevice', + 'soundfile', + 'numpy', + 'cv2', + 'nfc', + 'ndef', + 'bcrypt', + 'google', + 'feedparser', + 'kafka', + 'googlesamples', + 'icalendar', + 'httplib2', + 'mpd', + 'serial', + 'pyHS100', + 'grpc', + 'envirophat', + 'gps', + 'picamera', + 'pmw3901', + 'PIL', + 'croniter', + 'pyaudio', + 'avs', + 'PyOBEX', + 'todoist', + 'trello', + 'telegram', + 'telegram.ext', + 'pyfirmata2', + 'cups', + 'graphyte', + 'cpuinfo', + 'psutil', + 'openzwave', + 'deepspeech', + 'wave', + 'pvporcupine ', + 'pvcheetah', + 'pyotp', + 'linode_api4', + 'pyzbar', + 'tensorflow', + 'keras', + 'pandas', + 'samsungtvws', + 'paramiko', + 'luma', + 'zeroconf', + 'dbus', + 'gi', + 'gi.repository', + 'twilio', + 'Adafruit_Python_DHT', + 'RPi.GPIO', + 'RPLCD', + 'imapclient', + 'pysmartthings', + 'aiohttp', + 'watchdog', + 'pyngrok', + 'irc', + 'irc.bot', + 'irc.strings', + 'irc.client', + 'irc.connection', + 'irc.events', + 'defusedxml', + 'nio', + 'aiofiles', + 'aiofiles.os', +] sys.path.insert(0, os.path.abspath('../..')) diff --git a/docs/source/events.rst b/docs/source/events.rst index 5ca337d9d2..13d3ed79d1 100644 --- a/docs/source/events.rst +++ b/docs/source/events.rst @@ -41,6 +41,7 @@ Events platypush/events/linode.rst platypush/events/log.http.rst platypush/events/mail.rst + platypush/events/matrix.rst platypush/events/media.rst platypush/events/midi.rst platypush/events/mqtt.rst @@ -72,6 +73,7 @@ Events platypush/events/weather.rst platypush/events/web.rst platypush/events/web.widget.rst + platypush/events/websocket.rst platypush/events/wiimote.rst platypush/events/zeroborg.rst platypush/events/zeroconf.rst diff --git a/docs/source/platypush/events/matrix.rst b/docs/source/platypush/events/matrix.rst new file mode 100644 index 0000000000..eaad6da652 --- /dev/null +++ b/docs/source/platypush/events/matrix.rst @@ -0,0 +1,5 @@ +``matrix`` +========== + +.. automodule:: platypush.message.event.matrix + :members: diff --git a/docs/source/platypush/events/websocket.rst b/docs/source/platypush/events/websocket.rst new file mode 100644 index 0000000000..7ec41a2897 --- /dev/null +++ b/docs/source/platypush/events/websocket.rst @@ -0,0 +1,5 @@ +``websocket`` +============= + +.. automodule:: platypush.message.event.websocket + :members: diff --git a/docs/source/platypush/plugins/matrix.rst b/docs/source/platypush/plugins/matrix.rst new file mode 100644 index 0000000000..1572b1f989 --- /dev/null +++ b/docs/source/platypush/plugins/matrix.rst @@ -0,0 +1,5 @@ +``matrix`` +========== + +.. automodule:: platypush.plugins.matrix + :members: diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index b848e5930d..41be8a4710 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -75,6 +75,7 @@ Plugins platypush/plugins/mail.smtp.rst platypush/plugins/mailgun.rst platypush/plugins/mastodon.rst + platypush/plugins/matrix.rst platypush/plugins/media.chromecast.rst platypush/plugins/media.gstreamer.rst platypush/plugins/media.jellyfin.rst diff --git a/platypush/config/__init__.py b/platypush/config/__init__.py index e9acfec451..481bd27518 100644 --- a/platypush/config/__init__.py +++ b/platypush/config/__init__.py @@ -213,11 +213,10 @@ class Config: config['scripts_dir'] = os.path.abspath( os.path.expanduser(file_config[section]) ) - elif ( - 'disabled' not in file_config[section] - or file_config[section]['disabled'] is False - ): - config[section] = file_config[section] + else: + section_config = file_config.get(section, {}) or {} + if not section_config.get('disabled'): + config[section] = section_config return config diff --git a/platypush/message/event/matrix.py b/platypush/message/event/matrix.py new file mode 100644 index 0000000000..04ed49769a --- /dev/null +++ b/platypush/message/event/matrix.py @@ -0,0 +1,250 @@ +from datetime import datetime +from typing import Dict, Any + +from platypush.message.event import Event + + +class MatrixEvent(Event): + """ + Base matrix event. + """ + + def __init__( + self, + *args, + server_url: str, + sender_id: str | None = None, + sender_display_name: str | None = None, + sender_avatar_url: str | None = None, + room_id: str | None = None, + room_name: str | None = None, + room_topic: str | None = None, + server_timestamp: datetime | None = None, + **kwargs + ): + """ + :param server_url: Base server URL. + :param sender_id: The event's sender ID. + :param sender_display_name: The event's sender display name. + :param sender_avatar_url: The event's sender avatar URL. + :param room_id: Event room ID. + :param room_name: The name of the room associated to the event. + :param room_topic: The topic of the room associated to the event. + :param server_timestamp: The server timestamp of the event. + """ + evt_args: Dict[str, Any] = { + 'server_url': server_url, + } + + if sender_id: + evt_args['sender_id'] = sender_id + if sender_display_name: + evt_args['sender_display_name'] = sender_display_name + if sender_avatar_url: + evt_args['sender_avatar_url'] = sender_avatar_url + if room_id: + evt_args['room_id'] = room_id + if room_name: + evt_args['room_name'] = room_name + if room_topic: + evt_args['room_topic'] = room_topic + if server_timestamp: + evt_args['server_timestamp'] = server_timestamp + + super().__init__(*args, **evt_args, **kwargs) + + +class MatrixSyncEvent(MatrixEvent): + """ + Event triggered when the startup synchronization has been completed and the + plugin is ready to use. + """ + + +class MatrixMessageEvent(MatrixEvent): + """ + Event triggered when a message is received on a subscribed room. + """ + + def __init__( + self, + *args, + body: str = '', + url: str | None = None, + thumbnail_url: str | None = None, + mimetype: str | None = None, + formatted_body: str | None = None, + format: str | None = None, + **kwargs + ): + """ + :param body: The body of the message. + :param url: The URL of the media file, if the message includes media. + :param thumbnail_url: The URL of the thumbnail, if the message includes media. + :param mimetype: The MIME type of the media file, if the message includes media. + :param formatted_body: The formatted body, if ``format`` is specified. + :param format: The format of the message (e.g. ``html`` or ``markdown``). + """ + super().__init__( + *args, + body=body, + url=url, + thumbnail_url=thumbnail_url, + mimetype=mimetype, + formatted_body=formatted_body, + format=format, + **kwargs + ) + + +class MatrixMessageImageEvent(MatrixEvent): + """ + Event triggered when a message containing an image is received. + """ + + +class MatrixMessageFileEvent(MatrixEvent): + """ + Event triggered when a message containing a generic file is received. + """ + + +class MatrixMessageAudioEvent(MatrixEvent): + """ + Event triggered when a message containing an audio file is received. + """ + + +class MatrixMessageVideoEvent(MatrixEvent): + """ + Event triggered when a message containing a video file is received. + """ + + +class MatrixReactionEvent(MatrixEvent): + """ + Event triggered when a user submits a reaction to an event. + """ + + def __init__(self, *args, in_response_to_event_id: str, **kwargs): + """ + :param in_response_to_event_id: The ID of the URL related to the reaction. + """ + super().__init__( + *args, in_response_to_event_id=in_response_to_event_id, **kwargs + ) + + +class MatrixEncryptedMessageEvent(MatrixMessageEvent): + """ + Event triggered when a message is received but the client doesn't + have the E2E keys to decrypt it, or encryption has not been enabled. + """ + + +class MatrixCallEvent(MatrixEvent): + """ + Base class for Matrix call events. + """ + + def __init__( + self, *args, call_id: str, version: int, sdp: str | None = None, **kwargs + ): + """ + :param call_id: The unique ID of the call. + :param version: An increasing integer representing the version of the call. + :param sdp: SDP text of the session description. + """ + super().__init__(*args, call_id=call_id, version=version, sdp=sdp, **kwargs) + + +class MatrixCallInviteEvent(MatrixCallEvent): + """ + Event triggered when the user is invited to a call. + """ + + def __init__(self, *args, invite_validity: float | None = None, **kwargs): + """ + :param invite_validity: For how long the invite will be valid, in seconds. + :param sdp: SDP text of the session description. + """ + super().__init__(*args, invite_validity=invite_validity, **kwargs) + + +class MatrixCallAnswerEvent(MatrixCallEvent): + """ + Event triggered by the callee when they wish to answer the call. + """ + + +class MatrixCallHangupEvent(MatrixCallEvent): + """ + Event triggered when a participant in the call exists. + """ + + +class MatrixRoomCreatedEvent(MatrixEvent): + """ + Event triggered when a room is created. + """ + + +class MatrixRoomJoinEvent(MatrixEvent): + """ + Event triggered when a user joins a room. + """ + + +class MatrixRoomLeaveEvent(MatrixEvent): + """ + Event triggered when a user leaves a room. + """ + + +class MatrixRoomInviteEvent(MatrixEvent): + """ + Event triggered when the user is invited to a room. + """ + + +class MatrixRoomTopicChangedEvent(MatrixEvent): + """ + Event triggered when the topic/title of a room changes. + """ + + def __init__(self, *args, topic: str, **kwargs): + """ + :param topic: New room topic. + """ + super().__init__(*args, topic=topic, **kwargs) + + +class MatrixRoomTypingStartEvent(MatrixEvent): + """ + Event triggered when a user in a room starts typing. + """ + + +class MatrixRoomTypingStopEvent(MatrixEvent): + """ + Event triggered when a user in a room stops typing. + """ + + +class MatrixRoomSeenReceiptEvent(MatrixEvent): + """ + Event triggered when the last message seen by a user in a room is updated. + """ + + +class MatrixUserPresenceEvent(MatrixEvent): + """ + Event triggered when a user comes online or goes offline. + """ + + def __init__(self, *args, is_active: bool, last_active: datetime | None, **kwargs): + """ + :param is_active: True if the user is currently online. + :param topic: When the user was last active. + """ + super().__init__(*args, is_active=is_active, last_active=last_active, **kwargs) diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 40d189d4d0..7b895fdf33 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -12,17 +12,30 @@ from platypush.config import Config from platypush.context import get_plugin from platypush.message import Message from platypush.message.response import Response -from platypush.utils import get_hash, get_module_and_method_from_action, get_redis_queue_name_by_message, \ - is_functional_procedure +from platypush.utils import ( + get_hash, + get_module_and_method_from_action, + get_redis_queue_name_by_message, + is_functional_procedure, +) logger = logging.getLogger('platypush') class Request(Message): - """ Request message class """ + """Request message class""" - def __init__(self, target, action, origin=None, id=None, backend=None, - args=None, token=None, timestamp=None): + def __init__( + self, + target, + action, + origin=None, + id=None, + backend=None, + args=None, + token=None, + timestamp=None, + ): """ Params: target -- Target node [Str] @@ -48,9 +61,13 @@ class Request(Message): @classmethod def build(cls, msg): msg = super().parse(msg) - args = {'target': msg.get('target', Config.get('device_id')), 'action': msg['action'], - 'args': msg.get('args', {}), 'id': msg['id'] if 'id' in msg else cls._generate_id(), - 'timestamp': msg['_timestamp'] if '_timestamp' in msg else time.time()} + args = { + 'target': msg.get('target', Config.get('device_id')), + 'action': msg['action'], + 'args': msg.get('args', {}), + 'id': msg['id'] if 'id' in msg else cls._generate_id(), + 'timestamp': msg['_timestamp'] if '_timestamp' in msg else time.time(), + } if 'origin' in msg: args['origin'] = msg['origin'] @@ -61,7 +78,7 @@ class Request(Message): @staticmethod def _generate_id(): _id = '' - for i in range(0, 16): + for _ in range(0, 16): _id += '%.2x' % random.randint(0, 255) return _id @@ -84,9 +101,14 @@ class Request(Message): return proc_config(*args, **kwargs) - proc = Procedure.build(name=proc_name, requests=proc_config['actions'], - _async=proc_config['_async'], args=self.args, - backend=self.backend, id=self.id) + proc = Procedure.build( + name=proc_name, + requests=proc_config['actions'], + _async=proc_config['_async'], + args=self.args, + backend=self.backend, + id=self.id, + ) return proc.execute(*args, **kwargs) @@ -112,7 +134,7 @@ class Request(Message): if isinstance(value, str): value = self.expand_value_from_context(value, **context) - elif isinstance(value, dict) or isinstance(value, list): + elif isinstance(value, (dict, list)): self._expand_context(event_args=value, **context) event_args[key] = value @@ -132,7 +154,11 @@ class Request(Message): try: exec('{}="{}"'.format(k, re.sub(r'(^|[^\\])"', '\1\\"', v))) except Exception as e: - logger.debug('Could not set context variable {}={}: {}'.format(k, v, str(e))) + logger.debug( + 'Could not set context variable {}={}: {}'.format( + k, v, str(e) + ) + ) logger.debug('Context: {}'.format(context)) parsed_value = '' @@ -152,7 +178,7 @@ class Request(Message): if callable(context_value): context_value = context_value() - if isinstance(context_value, range) or isinstance(context_value, tuple): + if isinstance(context_value, (range, tuple)): context_value = [*context_value] if isinstance(context_value, datetime.date): context_value = context_value.isoformat() @@ -162,7 +188,7 @@ class Request(Message): parsed_value += prefix + ( json.dumps(context_value) - if isinstance(context_value, list) or isinstance(context_value, dict) + if isinstance(context_value, (list, dict)) else str(context_value) ) else: @@ -205,6 +231,9 @@ class Request(Message): """ def _thread_func(_n_tries, errors=None): + from platypush.context import get_bus + from platypush.plugins import RunnablePlugin + response = None try: @@ -221,11 +250,15 @@ class Request(Message): return response else: action = self.expand_value_from_context(self.action, **context) - (module_name, method_name) = get_module_and_method_from_action(action) + (module_name, method_name) = get_module_and_method_from_action( + action + ) plugin = get_plugin(module_name) except Exception as e: logger.exception(e) - msg = 'Uncaught pre-processing exception from action [{}]: {}'.format(self.action, str(e)) + msg = 'Uncaught pre-processing exception from action [{}]: {}'.format( + self.action, str(e) + ) logger.warning(msg) response = Response(output=None, errors=[msg]) self._send_response(response) @@ -243,24 +276,36 @@ class Request(Message): response = plugin.run(method_name, args) if not response: - logger.warning('Received null response from action {}'.format(action)) + logger.warning( + 'Received null response from action {}'.format(action) + ) else: if response.is_error(): - logger.warning(('Response processed with errors from ' + - 'action {}: {}').format( - action, str(response))) + logger.warning( + ( + 'Response processed with errors from ' + 'action {}: {}' + ).format(action, str(response)) + ) elif not response.disable_logging: - logger.info('Processed response from action {}: {}'. - format(action, str(response))) + logger.info( + 'Processed response from action {}: {}'.format( + action, str(response) + ) + ) except (AssertionError, TimeoutError) as e: - plugin.logger.exception(e) - logger.warning('{} from action [{}]: {}'.format(type(e), action, str(e))) + logger.warning( + '%s from action [%s]: %s', e.__class__.__name__, action, str(e) + ) response = Response(output=None, errors=[str(e)]) except Exception as e: # Retry mechanism plugin.logger.exception(e) - logger.warning(('Uncaught exception while processing response ' + - 'from action [{}]: {}').format(action, str(e))) + logger.warning( + ( + 'Uncaught exception while processing response ' + + 'from action [{}]: {}' + ).format(action, str(e)) + ) errors = errors or [] if str(e) not in errors: @@ -269,17 +314,21 @@ class Request(Message): response = Response(output=None, errors=errors) if _n_tries - 1 > 0: logger.info('Reloading plugin {} and retrying'.format(module_name)) - get_plugin(module_name, reload=True) - response = _thread_func(_n_tries=_n_tries-1, errors=errors) + plugin = get_plugin(module_name, reload=True) + if isinstance(plugin, RunnablePlugin): + plugin.bus = get_bus() + plugin.start() + + response = _thread_func(_n_tries=_n_tries - 1, errors=errors) finally: self._send_response(response) - return response - token_hash = Config.get('token_hash') + return response - if token_hash: - if self.token is None or get_hash(self.token) != token_hash: - raise PermissionError() + stored_token_hash = Config.get('token_hash') + token = getattr(self, 'token', '') + if stored_token_hash and get_hash(token) != stored_token_hash: + raise PermissionError() if _async: Thread(target=_thread_func, args=(n_tries,)).start() @@ -292,15 +341,18 @@ class Request(Message): the message into a UTF-8 JSON string """ - return json.dumps({ - 'type': 'request', - 'target': self.target, - 'action': self.action, - 'args': self.args, - 'origin': self.origin if hasattr(self, 'origin') else None, - 'id': self.id if hasattr(self, 'id') else None, - 'token': self.token if hasattr(self, 'token') else None, - '_timestamp': self.timestamp, - }) + return json.dumps( + { + 'type': 'request', + 'target': self.target, + 'action': self.action, + 'args': self.args, + 'origin': self.origin if hasattr(self, 'origin') else None, + 'id': self.id if hasattr(self, 'id') else None, + 'token': self.token if hasattr(self, 'token') else None, + '_timestamp': self.timestamp, + } + ) + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/__init__.py b/platypush/plugins/__init__.py index 9e3edab851..f35f06a4e6 100644 --- a/platypush/plugins/__init__.py +++ b/platypush/plugins/__init__.py @@ -21,12 +21,12 @@ def action(f): result = f(*args, **kwargs) if result and isinstance(result, Response): - result.errors = result.errors \ - if isinstance(result.errors, list) else [result.errors] + result.errors = ( + result.errors if isinstance(result.errors, list) else [result.errors] + ) response = result elif isinstance(result, tuple) and len(result) == 2: - response.errors = result[1] \ - if isinstance(result[1], list) else [result[1]] + response.errors = result[1] if isinstance(result[1], list) else [result[1]] if len(response.errors) == 1 and response.errors[0] is None: response.errors = [] @@ -41,12 +41,14 @@ def action(f): return _execute_action -class Plugin(EventGenerator, ExtensionWithManifest): # lgtm [py/missing-call-to-init] - """ Base plugin class """ +class Plugin(EventGenerator, ExtensionWithManifest): # lgtm [py/missing-call-to-init] + """Base plugin class""" def __init__(self, **kwargs): super().__init__() - self.logger = logging.getLogger('platypush:plugin:' + get_plugin_name_by_class(self.__class__)) + self.logger = logging.getLogger( + 'platypush:plugin:' + get_plugin_name_by_class(self.__class__) + ) if 'logging' in kwargs: self.logger.setLevel(getattr(logging, kwargs['logging'].upper())) @@ -55,8 +57,9 @@ class Plugin(EventGenerator, ExtensionWithManifest): # lgtm [py/missing-call-t ) def run(self, method, *args, **kwargs): - assert method in self.registered_actions, '{} is not a registered action on {}'.\ - format(method, self.__class__.__name__) + assert ( + method in self.registered_actions + ), '{} is not a registered action on {}'.format(method, self.__class__.__name__) return getattr(self, method)(*args, **kwargs) @@ -64,6 +67,7 @@ class RunnablePlugin(Plugin): """ Class for runnable plugins - i.e. plugins that have a start/stop method and can be started. """ + def __init__(self, poll_interval: Optional[float] = None, **kwargs): """ :param poll_interval: How often the :meth:`.loop` function should be execute (default: None, no pause/interval). @@ -80,6 +84,9 @@ class RunnablePlugin(Plugin): def should_stop(self): return self._should_stop.is_set() + def wait_stop(self, timeout=None): + return self._should_stop.wait(timeout=timeout) + def start(self): set_thread_name(self.__class__.__name__) self._thread = threading.Thread(target=self._runner) diff --git a/platypush/plugins/matrix/__init__.py b/platypush/plugins/matrix/__init__.py new file mode 100644 index 0000000000..d31bce4d66 --- /dev/null +++ b/platypush/plugins/matrix/__init__.py @@ -0,0 +1,1674 @@ +import asyncio +import datetime +import json +import logging +import os +import pathlib +import re +import threading + +from dataclasses import dataclass +from typing import Collection, Coroutine, Dict, Sequence +from urllib.parse import urlparse + +from async_lru import alru_cache +from nio import ( + Api, + AsyncClient, + AsyncClientConfig, + CallAnswerEvent, + CallHangupEvent, + CallInviteEvent, + ErrorResponse, + Event, + InviteEvent, + KeyVerificationStart, + KeyVerificationAccept, + KeyVerificationMac, + KeyVerificationKey, + KeyVerificationCancel, + LocalProtocolError, + LoginResponse, + MatrixRoom, + MegolmEvent, + ProfileGetResponse, + RoomCreateEvent, + RoomEncryptedAudio, + RoomEncryptedFile, + RoomEncryptedImage, + RoomEncryptedMedia, + RoomEncryptedVideo, + RoomGetEventError, + RoomGetStateResponse, + RoomMemberEvent, + RoomMessage, + RoomMessageAudio, + RoomMessageFile, + RoomMessageFormatted, + RoomMessageText, + RoomMessageImage, + RoomMessageMedia, + RoomMessageVideo, + RoomTopicEvent, + RoomUpgradeEvent, + StickerEvent, + SyncResponse, + ToDeviceError, + UnknownEncryptedEvent, + UnknownEvent, +) + +import aiofiles +import aiofiles.os +from nio.api import MessageDirection, RoomVisibility + +from nio.client.async_client import client_session +from nio.client.base_client import logged_in +from nio.crypto import decrypt_attachment +from nio.crypto.device import OlmDevice +from nio.events.ephemeral import ReceiptEvent, TypingNoticeEvent +from nio.events.presence import PresenceEvent +from nio.exceptions import OlmUnverifiedDeviceError +from nio.responses import DownloadResponse, RoomMessagesResponse + +from platypush.config import Config +from platypush.context import get_bus +from platypush.message.event.matrix import ( + MatrixCallAnswerEvent, + MatrixCallHangupEvent, + MatrixCallInviteEvent, + MatrixEncryptedMessageEvent, + MatrixMessageAudioEvent, + MatrixMessageEvent, + MatrixMessageFileEvent, + MatrixMessageImageEvent, + MatrixMessageVideoEvent, + MatrixReactionEvent, + MatrixRoomCreatedEvent, + MatrixRoomInviteEvent, + MatrixRoomJoinEvent, + MatrixRoomLeaveEvent, + MatrixRoomSeenReceiptEvent, + MatrixRoomTopicChangedEvent, + MatrixRoomTypingStartEvent, + MatrixRoomTypingStopEvent, + MatrixSyncEvent, + MatrixUserPresenceEvent, +) + +from platypush.plugins import AsyncRunnablePlugin, action +from platypush.schemas.matrix import ( + MatrixDeviceSchema, + MatrixDownloadedFileSchema, + MatrixEventIdSchema, + MatrixMemberSchema, + MatrixMessagesResponseSchema, + MatrixMyDeviceSchema, + MatrixProfileSchema, + MatrixRoomIdSchema, + MatrixRoomSchema, +) + +from platypush.utils import get_mime_type + +logger = logging.getLogger(__name__) + + +@dataclass +class Credentials: + server_url: str + user_id: str + access_token: str + device_id: str | None + + def to_dict(self) -> dict: + return { + 'server_url': self.server_url, + 'user_id': self.user_id, + 'access_token': self.access_token, + 'device_id': self.device_id, + } + + +class MatrixClient(AsyncClient): + def __init__( + self, + *args, + credentials_file: str, + store_path: str | None = None, + config: AsyncClientConfig | None = None, + autojoin_on_invite=True, + autotrust_devices=False, + autotrust_devices_whitelist: Collection[str] | None = None, + autotrust_rooms_whitelist: Collection[str] | None = None, + autotrust_users_whitelist: Collection[str] | None = None, + **kwargs, + ): + credentials_file = os.path.abspath(os.path.expanduser(credentials_file)) + + if not store_path: + store_path = os.path.join(Config.get('workdir'), 'matrix', 'store') # type: ignore + + assert store_path + store_path = os.path.abspath(os.path.expanduser(store_path)) + pathlib.Path(store_path).mkdir(exist_ok=True, parents=True) + + if not config: + config = AsyncClientConfig( + max_limit_exceeded=0, + max_timeouts=0, + store_sync_tokens=True, + encryption_enabled=True, + ) + + super().__init__(*args, config=config, store_path=store_path, **kwargs) + self.logger = logging.getLogger(self.__class__.__name__) + self._credentials_file = credentials_file + self._autojoin_on_invite = autojoin_on_invite + self._autotrust_devices = autotrust_devices + self._autotrust_devices_whitelist = autotrust_devices_whitelist + self._autotrust_rooms_whitelist = autotrust_rooms_whitelist or set() + self._autotrust_users_whitelist = autotrust_users_whitelist or set() + self._first_sync_performed = asyncio.Event() + self._last_batches_by_room = {} + self._typing_users_by_room = {} + + self._encrypted_attachments_keystore_path = os.path.join( + store_path, 'attachment_keys.json' + ) + self._encrypted_attachments_keystore = {} + self._sync_store_timer: threading.Timer | None = None + keystore = {} + + try: + with open(self._encrypted_attachments_keystore_path, 'r') as f: + keystore = json.load(f) + except (ValueError, OSError): + with open(self._encrypted_attachments_keystore_path, 'w') as f: + f.write(json.dumps({})) + + pathlib.Path(self._encrypted_attachments_keystore_path).touch( + mode=0o600, exist_ok=True + ) + + self._encrypted_attachments_keystore = { + tuple(key.split('|')): data for key, data in keystore.items() + } + + async def _autojoin_room_callback(self, room: MatrixRoom, *_): + await self.join(room.room_id) # type: ignore + + def _load_from_file(self): + if not os.path.isfile(self._credentials_file): + return + + try: + with open(self._credentials_file, 'r') as f: + credentials = json.load(f) + except json.JSONDecodeError: + self.logger.warning( + 'Could not read credentials_file %s - overwriting it', + self._credentials_file, + ) + return + + assert credentials.get('user_id'), 'Missing user_id' + assert credentials.get('access_token'), 'Missing access_token' + + self.access_token = credentials['access_token'] + self.user_id = credentials['user_id'] + self.homeserver = credentials.get('server_url', self.homeserver) + if credentials.get('device_id'): + self.device_id = credentials['device_id'] + + self.load_store() + + async def login( + self, + password: str | None = None, + device_name: str | None = None, + token: str | None = None, + ) -> LoginResponse: + self._load_from_file() + login_res = None + + if self.access_token: + self.load_store() + self.logger.info( + 'Logged in to %s as %s using the stored access token', + self.homeserver, + self.user_id, + ) + + login_res = LoginResponse( + user_id=self.user_id, + device_id=self.device_id, + access_token=self.access_token, + ) + else: + assert self.user, 'No credentials file found and no user provided' + login_args = {'device_name': device_name} + if token: + login_args['token'] = token + else: + assert ( + password + ), 'No credentials file found and no password nor access token provided' + login_args['password'] = password + + login_res = await super().login(**login_args) + assert isinstance(login_res, LoginResponse), f'Failed to login: {login_res}' + self.logger.info(login_res) + + credentials = Credentials( + server_url=self.homeserver, + user_id=login_res.user_id, + access_token=login_res.access_token, + device_id=login_res.device_id, + ) + + with open(self._credentials_file, 'w') as f: + json.dump(credentials.to_dict(), f) + os.chmod(self._credentials_file, 0o600) + + if self.should_upload_keys: + self.logger.info('Uploading encryption keys') + await self.keys_upload() + + self.logger.info('Synchronizing state') + self._first_sync_performed.clear() + self._add_callbacks() + sync_token = self.loaded_sync_token + self.loaded_sync_token = '' + await self.sync(sync_filter={'room': {'timeline': {'limit': 1}}}) + self.loaded_sync_token = sync_token + + self._sync_devices_trust() + self._first_sync_performed.set() + + get_bus().post(MatrixSyncEvent(server_url=self.homeserver)) + self.logger.info('State synchronized') + return login_res + + @logged_in + async def sync(self, *args, **kwargs) -> SyncResponse: + response = await super().sync(*args, **kwargs) + assert isinstance(response, SyncResponse), str(response) + self._last_batches_by_room.update( + { + room_id: { + 'prev_batch': room.timeline.prev_batch, + 'next_batch': response.next_batch, + } + for room_id, room in response.rooms.join.items() + } + ) + + return response + + @logged_in + async def room_messages( + self, room_id: str, start: str | None = None, *args, **kwargs + ) -> RoomMessagesResponse: + if not start: + start = self._last_batches_by_room.get(room_id, {}).get('prev_batch') + assert start, ( + f'No sync batches were found for room {room_id} and no start' + 'batch has been provided' + ) + + response = await super().room_messages(room_id, start, *args, **kwargs) + assert isinstance(response, RoomMessagesResponse), str(response) + return response + + def _sync_devices_trust(self): + all_devices = self.get_devices() + devices_to_trust: Dict[str, OlmDevice] = {} + untrusted_devices = { + device_id: device + for device_id, device in all_devices.items() + if not device.verified + } + + if self._autotrust_devices: + devices_to_trust.update(untrusted_devices) + else: + if self._autotrust_devices_whitelist: + devices_to_trust.update( + { + device_id: device + for device_id, device in all_devices.items() + if device_id in self._autotrust_devices_whitelist + and device_id in untrusted_devices + } + ) + if self._autotrust_rooms_whitelist: + devices_to_trust.update( + { + device_id: device + for room_id, devices in self.get_devices_by_room().items() + for device_id, device in devices.items() # type: ignore + if room_id in self._autotrust_rooms_whitelist + and device_id in untrusted_devices + } + ) + if self._autotrust_users_whitelist: + devices_to_trust.update( + { + device_id: device + for user_id, devices in self.get_devices_by_user().items() + for device_id, device in devices.items() # type: ignore + if user_id in self._autotrust_users_whitelist + and device_id in untrusted_devices + } + ) + + for device in devices_to_trust.values(): + self.verify_device(device) + self.logger.info( + 'Device %s by user %s added to the whitelist', device.id, device.user_id + ) + + def get_devices_by_user( + self, user_id: str | None = None + ) -> Dict[str, Dict[str, OlmDevice]] | Dict[str, OlmDevice]: + devices = {user: devices for user, devices in self.device_store.items()} + + if user_id: + devices = devices.get(user_id, {}) + return devices + + def get_devices(self) -> Dict[str, OlmDevice]: + return { + device_id: device + for _, devices in self.device_store.items() + for device_id, device in devices.items() + } + + def get_device(self, device_id: str) -> OlmDevice | None: + return self.get_devices().get(device_id) + + def get_devices_by_room( + self, room_id: str | None = None + ) -> Dict[str, Dict[str, OlmDevice]] | Dict[str, OlmDevice]: + devices = { + room_id: { + device_id: device + for _, devices in self.room_devices(room_id).items() + for device_id, device in devices.items() + } + for room_id in self.rooms.keys() + } + + if room_id: + devices = devices.get(room_id, {}) + return devices + + def _add_callbacks(self): + self.add_event_callback(self._event_catch_all, Event) + self.add_event_callback(self._on_invite, InviteEvent) # type: ignore + self.add_event_callback(self._on_message, RoomMessageText) # type: ignore + self.add_event_callback(self._on_message, RoomMessageMedia) # type: ignore + self.add_event_callback(self._on_message, RoomEncryptedMedia) # type: ignore + self.add_event_callback(self._on_message, StickerEvent) # type: ignore + self.add_event_callback(self._on_room_member, RoomMemberEvent) # type: ignore + self.add_event_callback(self._on_room_topic_changed, RoomTopicEvent) # type: ignore + self.add_event_callback(self._on_call_invite, CallInviteEvent) # type: ignore + self.add_event_callback(self._on_call_answer, CallAnswerEvent) # type: ignore + self.add_event_callback(self._on_call_hangup, CallHangupEvent) # type: ignore + self.add_event_callback(self._on_unknown_event, UnknownEvent) # type: ignore + self.add_event_callback(self._on_unknown_encrypted_event, UnknownEncryptedEvent) # type: ignore + self.add_event_callback(self._on_unknown_encrypted_event, MegolmEvent) # type: ignore + self.add_to_device_callback(self._on_key_verification_start, KeyVerificationStart) # type: ignore + self.add_to_device_callback(self._on_key_verification_cancel, KeyVerificationCancel) # type: ignore + self.add_to_device_callback(self._on_key_verification_key, KeyVerificationKey) # type: ignore + self.add_to_device_callback(self._on_key_verification_mac, KeyVerificationMac) # type: ignore + self.add_to_device_callback(self._on_key_verification_accept, KeyVerificationAccept) # type: ignore + self.add_ephemeral_callback(self._on_typing, TypingNoticeEvent) # type: ignore + self.add_ephemeral_callback(self._on_receipt, ReceiptEvent) # type: ignore + self.add_presence_callback(self._on_presence, PresenceEvent) # type: ignore + + if self._autojoin_on_invite: + self.add_event_callback(self._autojoin_room_callback, InviteEvent) # type: ignore + + def _sync_store(self): + self.logger.info('Synchronizing keystore') + serialized_keystore = json.dumps( + { + f'{server}|{media_id}': data + for ( + server, + media_id, + ), data in self._encrypted_attachments_keystore.items() + } + ) + + try: + with open(self._encrypted_attachments_keystore_path, 'w') as f: + f.write(serialized_keystore) + finally: + self._sync_store_timer = None + + @alru_cache(maxsize=500) + @client_session + async def get_profile(self, user_id: str | None = None) -> ProfileGetResponse: + """ + Cached version of get_profile. + """ + ret = await super().get_profile(user_id) + assert isinstance( + ret, ProfileGetResponse + ), f'Could not retrieve profile for user {user_id}: {ret.message}' + return ret + + @alru_cache(maxsize=500) + @client_session + async def room_get_state(self, room_id: str) -> RoomGetStateResponse: + """ + Cached version of room_get_state. + """ + ret = await super().room_get_state(room_id) + assert isinstance( + ret, RoomGetStateResponse + ), f'Could not retrieve profile for room {room_id}: {ret.message}' + return ret + + @client_session + async def download( + self, + server_name: str, + media_id: str, + filename: str | None = None, + allow_remote: bool = True, + ): + response = await super().download( + server_name, media_id, filename, allow_remote=allow_remote + ) + + assert isinstance( + response, DownloadResponse + ), f'Could not download media {media_id}: {response}' + + encryption_data = self._encrypted_attachments_keystore.get( + (server_name, media_id) + ) + if encryption_data: + self.logger.info('Decrypting media %s using the available keys', media_id) + response.filename = encryption_data.get('body', response.filename) + response.content_type = encryption_data.get( + 'mimetype', response.content_type + ) + response.body = decrypt_attachment( + response.body, + key=encryption_data.get('key'), + hash=encryption_data.get('hash'), + iv=encryption_data.get('iv'), + ) + + return response + + async def _event_base_args( + self, room: MatrixRoom | None, event: Event | None = None + ) -> dict: + sender_id = getattr(event, 'sender', None) + sender = ( + await self.get_profile(sender_id) if sender_id else None # type: ignore + ) + + return { + 'server_url': self.homeserver, + 'sender_id': sender_id, + 'sender_display_name': sender.displayname if sender else None, + 'sender_avatar_url': sender.avatar_url if sender else None, + **( + { + 'room_id': room.room_id, + 'room_name': room.name, + 'room_topic': room.topic, + } + if room + else {} + ), + 'server_timestamp': ( + datetime.datetime.fromtimestamp(event.server_timestamp / 1000) + if event and getattr(event, 'server_timestamp', None) + else None + ), + } + + async def _event_catch_all(self, room: MatrixRoom, event: Event): + self.logger.debug('Received event on room %s: %r', room.room_id, event) + + async def _on_invite(self, room: MatrixRoom, event: RoomMessageText): + get_bus().post( + MatrixRoomInviteEvent( + **(await self._event_base_args(room, event)), + ) + ) + + async def _on_message( + self, + room: MatrixRoom, + event: RoomMessageText | RoomMessageMedia | RoomEncryptedMedia | StickerEvent, + ): + if self._first_sync_performed.is_set(): + evt_type = MatrixMessageEvent + evt_args = { + 'body': event.body, + 'url': getattr(event, 'url', None), + **(await self._event_base_args(room, event)), + } + + if isinstance(event, (RoomMessageMedia, RoomEncryptedMedia, StickerEvent)): + evt_args['url'] = event.url + + if isinstance(event, RoomEncryptedMedia): + evt_args['thumbnail_url'] = event.thumbnail_url + evt_args['mimetype'] = event.mimetype + self._store_encrypted_media_keys(event) + if isinstance(event, RoomMessageFormatted): + evt_args['format'] = event.format + evt_args['formatted_body'] = event.formatted_body + + if isinstance(event, (RoomMessageImage, RoomEncryptedImage)): + evt_type = MatrixMessageImageEvent + elif isinstance(event, (RoomMessageAudio, RoomEncryptedAudio)): + evt_type = MatrixMessageAudioEvent + elif isinstance(event, (RoomMessageVideo, RoomEncryptedVideo)): + evt_type = MatrixMessageVideoEvent + elif isinstance(event, (RoomMessageFile, RoomEncryptedFile)): + evt_type = MatrixMessageFileEvent + + get_bus().post(evt_type(**evt_args)) + + def _store_encrypted_media_keys(self, event: RoomEncryptedMedia): + url = event.url.strip('/') + parsed_url = urlparse(url) + homeserver = parsed_url.netloc.strip('/') + media_key = (homeserver, parsed_url.path.strip('/')) + + self._encrypted_attachments_keystore[media_key] = { + 'url': url, + 'body': event.body, + 'key': event.key['k'], + 'hash': event.hashes['sha256'], + 'iv': event.iv, + 'homeserver': homeserver, + 'mimetype': event.mimetype, + } + + if not self._sync_store_timer: + self._sync_store_timer = threading.Timer(5, self._sync_store) + self._sync_store_timer.start() + + async def _on_room_member(self, room: MatrixRoom, event: RoomMemberEvent): + evt_type = None + if event.membership == 'join': + evt_type = MatrixRoomJoinEvent + elif event.membership == 'leave': + evt_type = MatrixRoomLeaveEvent + + if evt_type and self._first_sync_performed.is_set(): + get_bus().post( + evt_type( + **(await self._event_base_args(room, event)), + ) + ) + + async def _on_room_topic_changed(self, room: MatrixRoom, event: RoomTopicEvent): + if self._first_sync_performed.is_set(): + get_bus().post( + MatrixRoomTopicChangedEvent( + **(await self._event_base_args(room, event)), + topic=event.topic, + ) + ) + + async def _on_call_invite(self, room: MatrixRoom, event: CallInviteEvent): + if self._first_sync_performed.is_set(): + get_bus().post( + MatrixCallInviteEvent( + call_id=event.call_id, + version=event.version, + invite_validity=event.lifetime / 1000.0, + sdp=event.offer.get('sdp'), + **(await self._event_base_args(room, event)), + ) + ) + + async def _on_call_answer(self, room: MatrixRoom, event: CallAnswerEvent): + if self._first_sync_performed.is_set(): + get_bus().post( + MatrixCallAnswerEvent( + call_id=event.call_id, + version=event.version, + sdp=event.answer.get('sdp'), + **(await self._event_base_args(room, event)), + ) + ) + + async def _on_call_hangup(self, room: MatrixRoom, event: CallHangupEvent): + if self._first_sync_performed.is_set(): + get_bus().post( + MatrixCallHangupEvent( + call_id=event.call_id, + version=event.version, + **(await self._event_base_args(room, event)), + ) + ) + + async def _on_room_created(self, room: MatrixRoom, event: RoomCreateEvent): + get_bus().post( + MatrixRoomCreatedEvent( + **(await self._event_base_args(room, event)), + ) + ) + + def _get_sas(self, event): + sas = self.key_verifications.get(event.transaction_id) + if not sas: + self.logger.debug( + 'Received a key verification event with no associated transaction ID' + ) + + return sas + + async def _on_key_verification_start(self, event: KeyVerificationStart): + self.logger.info(f'Received a key verification request from {event.sender}') + + if 'emoji' not in event.short_authentication_string: + self.logger.warning( + 'Only emoji verification is supported, but the verifying device ' + 'provided the following authentication methods: %r', + event.short_authentication_string, + ) + return + + sas = self._get_sas(event) + if not sas: + return + + rs = await self.accept_key_verification(sas.transaction_id) + assert not isinstance( + rs, ToDeviceError + ), f'accept_key_verification failed: {rs}' + + rs = await self.to_device(sas.share_key()) + assert not isinstance(rs, ToDeviceError), f'Shared key exchange failed: {rs}' + + async def _on_key_verification_accept(self, event: KeyVerificationAccept): + self.logger.info('Key verification from device %s accepted', event.sender) + + async def _on_key_verification_cancel(self, event: KeyVerificationCancel): + self.logger.info( + 'The device %s cancelled a key verification request. ' 'Reason: %s', + event.sender, + event.reason, + ) + + async def _on_key_verification_key(self, event: KeyVerificationKey): + sas = self._get_sas(event) + if not sas: + return + + self.logger.info( + 'Received emoji verification from device %s: %s', + event.sender, + sas.get_emoji(), + ) + + rs = await self.confirm_short_auth_string(sas.transaction_id) + assert not isinstance( + rs, ToDeviceError + ), f'confirm_short_auth_string failed: {rs}' + + async def _on_key_verification_mac(self, event: KeyVerificationMac): + self.logger.info('Received MAC verification request from %s', event.sender) + sas = self._get_sas(event) + if not sas: + return + + try: + mac = sas.get_mac() + except LocalProtocolError as e: + self.logger.warning( + 'Verification from %s cancelled or unexpected protocol error. ' + 'Reason: %s', + e, + event.sender, + ) + return + + rs = await self.to_device(mac) + assert not isinstance( + rs, ToDeviceError + ), f'Sending of the verification MAC to {event.sender} failed: {rs}' + + self.logger.info('This device has been successfully verified!') + + async def _on_room_upgrade(self, room: MatrixRoom, event: RoomUpgradeEvent): + self.logger.info( + 'The room %s has been moved to %s', room.room_id, event.replacement_room + ) + + await self.room_leave(room.room_id) + await self.join(event.replacement_room) + + async def _on_typing(self, room: MatrixRoom, event: TypingNoticeEvent): + users = set(event.users) + typing_users = self._typing_users_by_room.get(room.room_id, set()) + start_typing_users = users.difference(typing_users) + stop_typing_users = typing_users.difference(users) + + for user in start_typing_users: + event.sender = user # type: ignore + get_bus().post( + MatrixRoomTypingStartEvent( + **(await self._event_base_args(room, event)), # type: ignore + sender=user, + ) + ) + + for user in stop_typing_users: + event.sender = user # type: ignore + get_bus().post( + MatrixRoomTypingStopEvent( + **(await self._event_base_args(room, event)), # type: ignore + ) + ) + + self._typing_users_by_room[room.room_id] = users + + async def _on_receipt(self, room: MatrixRoom, event: ReceiptEvent): + if self._first_sync_performed.is_set(): + for receipt in event.receipts: + event.sender = receipt.user_id # type: ignore + get_bus().post( + MatrixRoomSeenReceiptEvent( + **(await self._event_base_args(room, event)), # type: ignore + ) + ) + + async def _on_presence(self, event: PresenceEvent): + if self._first_sync_performed.is_set(): + last_active = ( + ( + datetime.datetime.now() + - datetime.timedelta(seconds=event.last_active_ago / 1000) + ) + if event.last_active_ago + else None + ) + + event.sender = event.user_id # type: ignore + get_bus().post( + MatrixUserPresenceEvent( + **(await self._event_base_args(None, event)), # type: ignore + is_active=event.currently_active or False, + last_active=last_active, + ) + ) + + async def _on_unknown_encrypted_event( + self, room: MatrixRoom, event: UnknownEncryptedEvent | MegolmEvent + ): + if self._first_sync_performed.is_set(): + body = getattr(event, 'ciphertext', '') + get_bus().post( + MatrixEncryptedMessageEvent( + body=body, + **(await self._event_base_args(room, event)), + ) + ) + + async def _on_unknown_event(self, room: MatrixRoom, event: UnknownEvent): + evt = None + + if event.type == 'm.reaction' and self._first_sync_performed.is_set(): + # Get the ID of the event this was a reaction to + relation_dict = event.source.get('content', {}).get('m.relates_to', {}) + reacted_to = relation_dict.get('event_id') + if reacted_to and relation_dict.get('rel_type') == 'm.annotation': + event_response = await self.room_get_event(room.room_id, reacted_to) + + if isinstance(event_response, RoomGetEventError): + self.logger.warning( + 'Error getting event that was reacted to (%s)', reacted_to + ) + else: + evt = MatrixReactionEvent( + in_response_to_event_id=event_response.event.event_id, + **(await self._event_base_args(room, event)), + ) + + if evt: + get_bus().post(evt) + else: + self.logger.info( + 'Received an unknown event on room %s: %r', room.room_id, event + ) + + async def upload_file( + self, + file: str, + name: str | None = None, + content_type: str | None = None, + encrypt: bool = False, + ): + file = os.path.expanduser(file) + file_stat = await aiofiles.os.stat(file) + + async with aiofiles.open(file, 'rb') as f: + return await super().upload( + f, # type: ignore + content_type=( + content_type or get_mime_type(file) or 'application/octet-stream' + ), + filename=name or os.path.basename(file), + encrypt=encrypt, + filesize=file_stat.st_size, + ) + + +class MatrixPlugin(AsyncRunnablePlugin): + """ + Matrix chat integration. + + Requires: + + * **matrix-nio** (``pip install 'matrix-nio[e2e]'``) + * **libolm** (on Debian ```apt-get install libolm-devel``, on Arch + ``pacman -S libolm``) + * **async_lru** (``pip install async_lru``) + + Note that ``libolm`` and the ``[e2e]`` module are only required if you want + E2E encryption support. + + Unless you configure the extension to use the token of an existing trusted + device, it is recommended that you mark the virtual device used by this + integration as trusted through a device that is already trusted. You may + encounter errors when sending or receiving messages on encrypted rooms if + your user has some untrusted devices. The easiest way to mark the device as + trusted is the following: + + - Configure the integration with your credentials and start Platypush. + - Use the same credentials to log in through a Matrix app or web client + (Element, Hydrogen, etc.) that has already been trusted. + - You should see a notification that prompts you to review the + untrusted devices logged in to your account. Dismiss it for now - + that verification path is currently broken on the underlying library + used by this integration. + - Instead, select a room that you have already joined, select the list + of users in the room and select yourself. + - In the _Security_ section, you should see that at least one device is + marked as unverified, and you can start the verification process by + clicking on it. + - Select "_Verify through emoji_". A list of emojis should be prompted. + Optionally, verify the logs of the application to check that you see + the same list. Then confirm that you see the same emojis, and your + device will be automatically marked as trusted. + + All the URLs returned by actions and events on this plugin are in the + ``mxc:///`` format. You can either convert them to HTTP + through the :meth:`.mxc_to_http` method, or download them through the + :meth:`.download` method. + + Triggers: + + * :class:`platypush.message.event.matrix.MatrixMessageEvent`: when a + message is received. + * :class:`platypush.message.event.matrix.MatrixMessageImageEvent`: when a + message containing an image is received. + * :class:`platypush.message.event.matrix.MatrixMessageAudioEvent`: when a + message containing an audio file is received. + * :class:`platypush.message.event.matrix.MatrixMessageVideoEvent`: when a + message containing a video file is received. + * :class:`platypush.message.event.matrix.MatrixMessageFileEvent`: when a + message containing a generic file is received. + * :class:`platypush.message.event.matrix.MatrixSyncEvent`: when the + startup synchronization has been completed and the plugin is ready to + use. + * :class:`platypush.message.event.matrix.MatrixRoomCreatedEvent`: when + a room is created. + * :class:`platypush.message.event.matrix.MatrixRoomJoinEvent`: when a + user joins a room. + * :class:`platypush.message.event.matrix.MatrixRoomLeaveEvent`: when a + user leaves a room. + * :class:`platypush.message.event.matrix.MatrixRoomInviteEvent`: when + the user is invited to a room. + * :class:`platypush.message.event.matrix.MatrixRoomTopicChangedEvent`: + when the topic/title of a room changes. + * :class:`platypush.message.event.matrix.MatrixCallInviteEvent`: when + the user is invited to a call. + * :class:`platypush.message.event.matrix.MatrixCallAnswerEvent`: when a + called user wishes to pick the call. + * :class:`platypush.message.event.matrix.MatrixCallHangupEvent`: when a + called user exits the call. + * :class:`platypush.message.event.matrix.MatrixEncryptedMessageEvent`: + when a message is received but the client doesn't have the E2E keys + to decrypt it, or encryption has not been enabled. + * :class:`platypush.message.event.matrix.MatrixRoomTypingStartEvent`: + when a user in a room starts typing. + * :class:`platypush.message.event.matrix.MatrixRoomTypingStopEvent`: + when a user in a room stops typing. + * :class:`platypush.message.event.matrix.MatrixRoomSeenReceiptEvent`: + when the last message seen by a user in a room is updated. + * :class:`platypush.message.event.matrix.MatrixUserPresenceEvent`: + when a user comes online or goes offline. + + """ + + def __init__( + self, + server_url: str = 'https://matrix.to', + user_id: str | None = None, + password: str | None = None, + access_token: str | None = None, + device_name: str | None = 'platypush', + device_id: str | None = None, + download_path: str | None = None, + autojoin_on_invite: bool = True, + autotrust_devices: bool = False, + autotrust_devices_whitelist: Collection[str] | None = None, + autotrust_users_whitelist: Collection[str] | None = None, + autotrust_rooms_whitelist: Collection[str] | None = None, + **kwargs, + ): + """ + Authentication requires user_id/password on the first login. + Afterwards, session credentials are stored under + ``<$PLATYPUSH_WORKDIR>/matrix/credentials.json`` (default: + ``~/.local/share/platypush/matrix/credentials.json``), and you can + remove the cleartext credentials from your configuration file. + + Otherwise, if you already have an ``access_token``, you can set the + associated field instead of using ``password``. This may be required if + the user has 2FA enabled. + + :param server_url: Default Matrix instance base URL (default: ``https://matrix.to``). + :param user_id: user_id, in the format ``@user:example.org``, or just + the username if the account is hosted on the same server configured in + the ``server_url``. + :param password: User password. + :param access_token: User access token. + :param device_name: The name of this device/connection (default: ``platypush``). + :param device_id: Use an existing ``device_id`` for the sessions. + :param download_path: The folder where downloaded media will be saved + (default: ``~/Downloads``). + :param autojoin_on_invite: Whether the account should automatically + join rooms upon invite. If false, then you may want to implement your + own logic in an event hook when a + :class:`platypush.message.event.matrix.MatrixRoomInviteEvent` event is + received, and call the :meth:`.join` method if required. + :param autotrust_devices: If set to True, the plugin will automatically + trust the devices on encrypted rooms. Set this property to True + only if you only plan to use a bot on trusted rooms. Note that if + no automatic trust mechanism is set you may need to explicitly + create your logic for trusting users - either with a hook when + :class:`platypush.message.event.matrix.MatrixSyncEvent` is + received, or when a room is joined, or before sending a message. + :param autotrust_devices_whitelist: Automatically trust devices with IDs + IDs provided in this list. + :param autotrust_users_whitelist: Automatically trust devices from the + user IDs provided in this list. + :param autotrust_rooms_whitelist: Automatically trust devices on the + room IDs provided in this list. + """ + super().__init__(**kwargs) + if not (server_url.startswith('http://') or server_url.startswith('https://')): + server_url = f'https://{server_url}' + self._server_url = server_url + server_name = self._server_url.split('/')[2].split(':')[0] + + if user_id and not re.match(user_id, '^@[a-zA-Z0-9.-_]+:.+'): + user_id = f'@{user_id}:{server_name}' + + self._user_id = user_id + self._password = password + self._access_token = access_token + self._device_name = device_name + self._device_id = device_id + self._download_path = download_path or os.path.join( + os.path.expanduser('~'), 'Downloads' + ) + + self._autojoin_on_invite = autojoin_on_invite + self._autotrust_devices = autotrust_devices + self._autotrust_devices_whitelist = set(autotrust_devices_whitelist or []) + self._autotrust_users_whitelist = set(autotrust_users_whitelist or []) + self._autotrust_rooms_whitelist = set(autotrust_rooms_whitelist or []) + self._workdir = os.path.join(Config.get('workdir'), 'matrix') # type: ignore + self._credentials_file = os.path.join(self._workdir, 'credentials.json') + self._processed_responses = {} + self._client = self._get_client() + pathlib.Path(self._workdir).mkdir(parents=True, exist_ok=True) + + def _get_client(self) -> MatrixClient: + return MatrixClient( + homeserver=self._server_url, + user=self._user_id, + credentials_file=self._credentials_file, + autojoin_on_invite=self._autojoin_on_invite, + autotrust_devices=self._autotrust_devices, + autotrust_devices_whitelist=self._autotrust_devices_whitelist, + autotrust_rooms_whitelist=self._autotrust_rooms_whitelist, + autotrust_users_whitelist=self._autotrust_users_whitelist, + device_id=self._device_id, + ) + + @property + def client(self) -> MatrixClient: + if not self._client: + self._client = self._get_client() + return self._client + + async def _login(self) -> MatrixClient: + await self.client.login( + password=self._password, + device_name=self._device_name, + token=self._access_token, + ) + + return self.client + + async def listen(self): + while not self.should_stop(): + await self._login() + + try: + await self.client.sync_forever(timeout=30000, full_state=True) + except KeyboardInterrupt: + pass + except Exception as e: + self.logger.exception(e) + self.logger.info('Waiting 10 seconds before reconnecting') + await asyncio.sleep(10) + finally: + try: + await self.client.close() + finally: + self._client = None + + def _loop_execute(self, coro: Coroutine): + assert self._loop, 'The loop is not running' + + try: + ret = asyncio.run_coroutine_threadsafe(coro, self._loop).result() + except OlmUnverifiedDeviceError as e: + raise AssertionError(str(e)) + + assert not isinstance(ret, ErrorResponse), ret.message + if hasattr(ret, 'transport_response'): + response = ret.transport_response + assert response.ok, f'{coro} failed with status {response.status}' + + return ret + + def _process_local_attachment(self, attachment: str, room_id: str) -> dict: + attachment = os.path.expanduser(attachment) + assert os.path.isfile(attachment), f'{attachment} is not a valid file' + + filename = os.path.basename(attachment) + mime_type = get_mime_type(attachment) or 'application/octet-stream' + message_type = mime_type.split('/')[0] + if message_type not in {'audio', 'video', 'image'}: + message_type = 'text' + + encrypted = self.get_room(room_id).output.get('encrypted', False) # type: ignore + url = self.upload( + attachment, name=filename, content_type=mime_type, encrypt=encrypted + ).output # type: ignore + + return { + 'url': url, + 'msgtype': 'm.' + message_type, + 'body': filename, + 'info': { + 'size': os.stat(attachment).st_size, + 'mimetype': mime_type, + }, + } + + def _process_remote_attachment(self, attachment: str) -> dict: + parsed_url = urlparse(attachment) + server = parsed_url.netloc.strip('/') + media_id = parsed_url.path.strip('/') + + response = self._loop_execute(self.client.download(server, media_id)) + + content_type = response.content_type + message_type = content_type.split('/')[0] + if message_type not in {'audio', 'video', 'image'}: + message_type = 'text' + + return { + 'url': attachment, + 'msgtype': 'm.' + message_type, + 'body': response.filename, + 'info': { + 'size': len(response.body), + 'mimetype': content_type, + }, + } + + def _process_attachment(self, attachment: str, room_id: str): + if attachment.startswith('mxc://'): + return self._process_remote_attachment(attachment) + return self._process_local_attachment(attachment, room_id=room_id) + + @action + def send_message( + self, + room_id: str, + message_type: str = 'text', + body: str | None = None, + attachment: str | None = None, + tx_id: str | None = None, + ignore_unverified_devices: bool = False, + ): + """ + Send a message to a room. + + :param room_id: Room ID. + :param body: Message body. + :param attachment: Path to a local file to send as an attachment, or + URL of an existing Matrix media ID in the format + ``mxc:///``. If the attachment is a local file, + the file will be automatically uploaded, ``message_type`` will be + automatically inferred from the file and the ``body`` will be + replaced by the filename. + :param message_type: Message type. Supported: `text`, `audio`, `video`, + `image`. Default: `text`. + :param tx_id: Unique transaction ID to associate to this message. + :param ignore_unverified_devices: If true, unverified devices will be + ignored. Otherwise, if the room is encrypted and it contains + devices that haven't been marked as trusted, the message + delivery may fail (default: False). + :return: .. schema:: matrix.MatrixEventIdSchema + """ + content = { + 'msgtype': 'm.' + message_type, + 'body': body, + } + + if attachment: + content.update(self._process_attachment(attachment, room_id=room_id)) + + ret = self._loop_execute( + self.client.room_send( + message_type='m.room.message', + room_id=room_id, + tx_id=tx_id, + ignore_unverified_devices=ignore_unverified_devices, + content=content, + ) + ) + + ret = self._loop_execute(ret.transport_response.json()) + return MatrixEventIdSchema().dump(ret) + + @action + def get_profile(self, user_id: str): + """ + Retrieve the details about a user. + + :param user_id: User ID. + :return: .. schema:: matrix.MatrixProfileSchema + """ + profile = self._loop_execute(self.client.get_profile(user_id)) # type: ignore + profile.user_id = user_id + return MatrixProfileSchema().dump(profile) + + @action + def get_room(self, room_id: str): + """ + Retrieve the details about a room. + + :param room_id: room ID. + :return: .. schema:: matrix.MatrixRoomSchema + """ + response = self._loop_execute(self.client.room_get_state(room_id)) # type: ignore + room_args = {'room_id': room_id, 'own_user_id': None, 'encrypted': False} + room_params = {} + + for evt in response.events: + if evt.get('type') == 'm.room.create': + room_args['own_user_id'] = evt.get('content', {}).get('creator') + elif evt.get('type') == 'm.room.encryption': + room_args['encrypted'] = True + elif evt.get('type') == 'm.room.name': + room_params['name'] = evt.get('content', {}).get('name') + elif evt.get('type') == 'm.room.topic': + room_params['topic'] = evt.get('content', {}).get('topic') + + room = MatrixRoom(**room_args) + for k, v in room_params.items(): + setattr(room, k, v) + return MatrixRoomSchema().dump(room) + + @action + def get_messages( + self, + room_id: str, + start: str | None = None, + end: str | None = None, + backwards: bool = True, + limit: int = 10, + ): + """ + Retrieve a list of messages from a room. + + :param room_id: Room ID. + :param start: Start retrieving messages from this batch ID (default: + latest batch returned from a call to ``sync``). + :param end: Retrieving messages until this batch ID. + :param backwards: Set to True if you want to retrieve messages starting + from the most recent, in descending order (default). Otherwise, the + first returned message will be the oldest and messages will be + returned in ascending order. + :param limit: Maximum number of messages to be returned (default: 10). + :return: .. schema:: matrix.MatrixMessagesResponseSchema + """ + response = self._loop_execute( + self.client.room_messages( + room_id, + start=start, + end=end, + limit=limit, + direction=( + MessageDirection.back if backwards else MessageDirection.front + ), + ) + ) + + response.chunk = [m for m in response.chunk if isinstance(m, RoomMessage)] + return MatrixMessagesResponseSchema().dump(response) + + @action + def get_my_devices(self): + """ + Get the list of devices associated to the current user. + + :return: .. schema:: matrix.MatrixMyDeviceSchema(many=True) + """ + response = self._loop_execute(self.client.devices()) + return MatrixMyDeviceSchema().dump(response.devices, many=True) + + @action + def get_device(self, device_id: str): + """ + Get the info about a device given its ID. + + :return: .. schema:: matrix.MatrixDeviceSchema + """ + return MatrixDeviceSchema().dump(self._get_device(device_id)) + + @action + def update_device(self, device_id: str, display_name: str | None = None): + """ + Update information about a user's device. + + :param display_name: New display name. + :return: .. schema:: matrix.MatrixDeviceSchema + """ + content = {} + if display_name: + content['display_name'] = display_name + + self._loop_execute(self.client.update_device(device_id, content)) + return MatrixDeviceSchema().dump(self._get_device(device_id)) + + @action + def delete_devices( + self, + devices: Sequence[str], + username: str | None = None, + password: str | None = None, + ): + """ + Delete a list of devices from the user's authorized list and invalidate + their access tokens. + + :param devices: List of devices that should be deleted. + :param username: Username, if the server requires authentication upon + device deletion. + :param password: User password, if the server requires authentication + upon device deletion. + """ + auth = {} + if username and password: + auth = {'type': 'm.login.password', 'user': username, 'password': password} + + self._loop_execute(self.client.delete_devices([*devices], auth=auth)) + + @action + def get_joined_rooms(self): + """ + Retrieve the rooms that the user has joined. + + :return: .. schema:: matrix.MatrixRoomSchema(many=True) + """ + response = self._loop_execute(self.client.joined_rooms()) + return [self.get_room(room_id).output for room_id in response.rooms] # type: ignore + + @action + def get_room_members(self, room_id: str): + """ + Retrieve the list of users joined into a room. + + :param room_id: The room ID. + :return: .. schema:: matrix.MatrixMemberSchema(many=True) + """ + response = self._loop_execute(self.client.joined_members(room_id)) + return MatrixMemberSchema().dump(response.members, many=True) + + @action + def room_alias_to_id(self, alias: str) -> str: + """ + Convert a room alias (in the format ``#alias:matrix.example.org``) to a + room ID (in the format ``!aBcDeFgHiJkMnO:matrix.example.org'). + + :param alias: The room alias. + :return: The room ID, as a string. + """ + response = self._loop_execute(self.client.room_resolve_alias(alias)) + return response.room_id + + @action + def add_room_alias(self, room_id: str, alias: str): + """ + Add an alias to a room. + + :param room_id: An existing room ID. + :param alias: The room alias. + """ + self._loop_execute(self.client.room_put_alias(alias, room_id)) + + @action + def delete_room_alias(self, alias: str): + """ + Delete a room alias. + + :param alias: The room alias. + """ + self._loop_execute(self.client.room_delete_alias(alias)) + + @action + def upload_keys(self): + """ + Synchronize the E2EE keys with the homeserver. + """ + self._loop_execute(self.client.keys_upload()) + + def _get_device(self, device_id: str) -> OlmDevice: + device = self.client.get_device(device_id) + assert device, f'No such device_id: {device_id}' + return device + + @action + def trust_device(self, device_id: str): + """ + Mark a device as trusted. + + :param device_id: Device ID. + """ + device = self._get_device(device_id) + self.client.verify_device(device) + + @action + def untrust_device(self, device_id: str): + """ + Mark a device as untrusted. + + :param device_id: Device ID. + """ + device = self._get_device(device_id) + self.client.unverify_device(device) + + @action + def mxc_to_http(self, url: str, homeserver: str | None = None) -> str: + """ + Convert a Matrix URL (in the format ``mxc://server/media_id``) to an + HTTP URL. + + Note that invoking this function on a URL containing encrypted content + (i.e. a URL containing media sent to an encrypted room) will provide a + URL that points to encrypted content. The best way to deal with + encrypted media is by using :meth:`.download` to download the media + locally. + + :param url: The MXC URL to be converted. + :param homeserver: The hosting homeserver (default: the same as the URL). + :return: The converted HTTP(s) URL. + """ + http_url = Api.mxc_to_http(url, homeserver=homeserver) + assert http_url, f'Could not convert URL {url}' + return http_url + + @action + def download( + self, + url: str, + download_path: str | None = None, + filename: str | None = None, + allow_remote=True, + ): + """ + Download a file given its Matrix URL. + + Note that URLs that point to encrypted resources will be automatically + decrypted only if they were received on a room joined by this account. + + :param url: Matrix URL, in the format ``mxc:///``. + :param download_path: Override the default ``download_path`` (output + directory for the downloaded file). + :param filename: Name of the output file (default: inferred from the + remote resource). + :param allow_remote: Indicates to the server that it should not attempt + to fetch the media if it is deemed remote. This is to prevent + routing loops where the server contacts itself. + :return: .. schema:: matrix.MatrixDownloadedFileSchema + """ + parsed_url = urlparse(url) + server = parsed_url.netloc.strip('/') + media_id = parsed_url.path.strip('/') + + response = self._loop_execute( + self.client.download( + server, media_id, filename=filename, allow_remote=allow_remote + ) + ) + + if not download_path: + download_path = self._download_path + if not filename: + filename = response.filename or media_id + + outfile = os.path.join(str(download_path), str(filename)) + pathlib.Path(download_path).mkdir(parents=True, exist_ok=True) + + with open(outfile, 'wb') as f: + f.write(response.body) + + return MatrixDownloadedFileSchema().dump( + { + 'url': url, + 'path': outfile, + 'size': len(response.body), + 'content_type': response.content_type, + } + ) + + @action + def upload( + self, + file: str, + name: str | None = None, + content_type: str | None = None, + encrypt: bool = False, + ) -> str: + """ + Upload a file to the server. + + :param file: Path to the file to upload. + :param name: Filename to be used for the remote file (default: same as + the local file). + :param content_type: Specify a content type for the file (default: + inferred from the file's extension and content). + :param encrypt: Encrypt the file (default: False). + :return: The Matrix URL of the uploaded resource. + """ + rs = self._loop_execute( + self.client.upload_file(file, name, content_type, encrypt) + ) + + return rs[0].content_uri + + @action + def create_room( + self, + name: str | None = None, + alias: str | None = None, + topic: str | None = None, + is_public: bool = False, + is_direct: bool = False, + federate: bool = True, + encrypted: bool = False, + invite_users: Sequence[str] = (), + ): + """ + Create a new room on the server. + + :param name: Room name. + :param alias: Custom alias for the canonical name. For example, if set + to ``foo``, the alias for this room will be + ``#foo:matrix.example.org``. + :param topic: Room topic. + :param is_public: Set to True if you want the room to be public and + discoverable (default: False). + :param is_direct: Set to True if this should be considered a direct + room with only one user (default: False). + :param federate: Whether you want to allow users from other servers to + join the room (default: True). + :param encrypted: Whether the room should be encrypted (default: False). + :param invite_users: A list of user IDs to invite to the room. + :return: .. schema:: matrix.MatrixRoomIdSchema + """ + rs = self._loop_execute( + self.client.room_create( + name=name, + alias=alias, + topic=topic, + is_direct=is_direct, + federate=federate, + invite=invite_users, + visibility=( + RoomVisibility.public if is_public else RoomVisibility.private + ), + initial_state=[ + { + 'type': 'm.room.encryption', + 'content': { + 'algorithm': 'm.megolm.v1.aes-sha2', + }, + } + ] + if encrypted + else (), + ) + ) + + return MatrixRoomIdSchema().dump(rs) + + @action + def invite(self, room_id: str, user_id: str): + """ + Invite a user to a room. + + :param room_id: Room ID. + :param user_id: User ID. + """ + self._loop_execute(self.client.room_invite(room_id, user_id)) + + @action + def kick(self, room_id: str, user_id: str, reason: str | None = None): + """ + Kick a user out of a room. + + :param room_id: Room ID. + :param user_id: User ID. + :param reason: Optional reason. + """ + self._loop_execute(self.client.room_kick(room_id, user_id, reason)) + + @action + def ban(self, room_id: str, user_id: str, reason: str | None = None): + """ + Ban a user from a room. + + :param room_id: Room ID. + :param user_id: User ID. + :param reason: Optional reason. + """ + self._loop_execute(self.client.room_ban(room_id, user_id, reason)) + + @action + def unban(self, room_id: str, user_id: str): + """ + Remove a user ban from a room. + + :param room_id: Room ID. + :param user_id: User ID. + """ + self._loop_execute(self.client.room_unban(room_id, user_id)) + + @action + def join(self, room_id: str): + """ + Join a room. + + :param room_id: Room ID. + """ + self._loop_execute(self.client.join(room_id)) + + @action + def leave(self, room_id: str): + """ + Leave a joined room. + + :param room_id: Room ID. + """ + self._loop_execute(self.client.room_leave(room_id)) + + @action + def forget(self, room_id: str): + """ + Leave a joined room and forget its data as well as all the messages. + + If all the users leave a room, that room will be marked for deletion by + the homeserver. + + :param room_id: Room ID. + """ + self._loop_execute(self.client.room_forget(room_id)) + + @action + def set_display_name(self, display_name: str): + """ + Set/change the display name for the current user. + + :param display_name: New display name. + """ + self._loop_execute(self.client.set_displayname(display_name)) + + @action + def set_avatar(self, url: str): + """ + Set/change the avatar URL for the current user. + + :param url: New avatar URL. It must be a valid ``mxc://`` link. + """ + self._loop_execute(self.client.set_avatar(url)) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/matrix/manifest.yaml b/platypush/plugins/matrix/manifest.yaml new file mode 100644 index 0000000000..b348205275 --- /dev/null +++ b/platypush/plugins/matrix/manifest.yaml @@ -0,0 +1,50 @@ +manifest: + events: + platypush.message.event.matrix.MatrixMessageEvent: when a message is + received. + platypush.message.event.matrix.MatrixMessageImageEvent: when a message + containing an image is received. + platypush.message.event.matrix.MatrixMessageAudioEvent: when a message + containing an audio file is received. + platypush.message.event.matrix.MatrixMessageVideoEvent: when a message + containing a video file is received. + platypush.message.event.matrix.MatrixMessageFileEvent: when a message + containing a generic file is received. + platypush.message.event.matrix.MatrixSyncEvent: when the startup + synchronization has been completed and the plugin is ready to use. + platypush.message.event.matrix.MatrixRoomCreatedEvent: when a room is + created. + platypush.message.event.matrix.MatrixRoomJoinEvent: when a user joins a + room. + platypush.message.event.matrix.MatrixRoomLeaveEvent: when a user leaves a + room. + platypush.message.event.matrix.MatrixRoomInviteEvent: when the user is + invited to a room. + platypush.message.event.matrix.MatrixRoomTopicChangedEvent: when the + topic/title of a room changes. + platypush.message.event.matrix.MatrixCallInviteEvent: when the user is + invited to a call. + platypush.message.event.matrix.MatrixCallAnswerEvent: when a called user + wishes to pick the call. + platypush.message.event.matrix.MatrixCallHangupEvent: when a called user + exits the call. + platypush.message.event.matrix.MatrixEncryptedMessageEvent: when a message + is received but the client doesn't have the E2E keys to decrypt it, or + encryption has not been enabled. + platypush.message.event.matrix.MatrixRoomTypingStartEvent: when a user in a + room starts typing. + platypush.message.event.matrix.MatrixRoomTypingStopEvent: when a user in a + room stops typing. + platypush.message.event.matrix.MatrixRoomSeenReceiptEvent: when the last + message seen by a user in a room is updated. + platypush.message.event.matrix.MatrixUserPresenceEvent: when a user comes + online or goes offline. + apt: + - libolm-devel + pacman: + - libolm + pip: + - matrix-nio[e2e] + - async_lru + package: platypush.plugins.matrix + type: plugin diff --git a/platypush/schemas/__init__.py b/platypush/schemas/__init__.py index ca2805627c..ce157319b5 100644 --- a/platypush/schemas/__init__.py +++ b/platypush/schemas/__init__.py @@ -6,7 +6,7 @@ from dateutil.tz import tzutc from marshmallow import fields -class StrippedString(fields.Function): # lgtm [py/missing-call-to-init] +class StrippedString(fields.Function): # lgtm [py/missing-call-to-init] def __init__(self, *args, **kwargs): kwargs['serialize'] = self._strip kwargs['deserialize'] = self._strip @@ -21,7 +21,17 @@ class StrippedString(fields.Function): # lgtm [py/missing-call-to-init] return value.strip() -class DateTime(fields.Function): # lgtm [py/missing-call-to-init] +class Function(fields.Function): # lgtm [py/missing-call-to-init] + def _get_attr(self, obj, attr: str, _recursive=True): + if self.attribute and _recursive: + return self._get_attr(obj, self.attribute, False) + if hasattr(obj, attr): + return getattr(obj, attr) + elif hasattr(obj, 'get'): + return obj.get(attr) + + +class DateTime(Function): # lgtm [py/missing-call-to-init] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.metadata = { @@ -30,7 +40,7 @@ class DateTime(fields.Function): # lgtm [py/missing-call-to-init] } def _serialize(self, value, attr, obj, **kwargs) -> Optional[str]: - value = normalize_datetime(obj.get(attr)) + value = normalize_datetime(self._get_attr(obj, attr)) if value: return value.isoformat() @@ -38,7 +48,7 @@ class DateTime(fields.Function): # lgtm [py/missing-call-to-init] return normalize_datetime(value) -class Date(fields.Function): # lgtm [py/missing-call-to-init] +class Date(Function): # lgtm [py/missing-call-to-init] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.metadata = { @@ -47,7 +57,7 @@ class Date(fields.Function): # lgtm [py/missing-call-to-init] } def _serialize(self, value, attr, obj, **kwargs) -> Optional[str]: - value = normalize_datetime(obj.get(attr)) + value = normalize_datetime(self._get_attr(obj, attr)) if value: return date(value.year, value.month, value.day).isoformat() @@ -56,10 +66,12 @@ class Date(fields.Function): # lgtm [py/missing-call-to-init] return date.fromtimestamp(dt.timestamp()) -def normalize_datetime(dt: Union[str, date, datetime]) -> Optional[Union[date, datetime]]: +def normalize_datetime( + dt: Optional[Union[str, date, datetime]] +) -> Optional[Union[date, datetime]]: if not dt: return - if isinstance(dt, datetime) or isinstance(dt, date): + if isinstance(dt, (datetime, date)): return dt try: diff --git a/platypush/schemas/matrix.py b/platypush/schemas/matrix.py new file mode 100644 index 0000000000..cb9ecc2390 --- /dev/null +++ b/platypush/schemas/matrix.py @@ -0,0 +1,385 @@ +from marshmallow import fields +from marshmallow.schema import Schema + +from platypush.schemas import DateTime + + +class MillisecondsTimestamp(DateTime): + def _get_attr(self, *args, **kwargs): + value = super()._get_attr(*args, **kwargs) + if isinstance(value, int): + value = float(value / 1000) + return value + + +class MatrixEventIdSchema(Schema): + event_id = fields.String( + required=True, + metadata={ + 'description': 'Event ID', + 'example': '$24KT_aQz6sSKaZH8oTCibRTl62qywDgQXMpz5epXsW5', + }, + ) + + +class MatrixRoomIdSchema(Schema): + room_id = fields.String( + required=True, + metadata={ + 'description': 'Room ID', + 'example': '!aBcDeFgHiJkMnO:matrix.example.org', + }, + ) + + +class MatrixProfileSchema(Schema): + user_id = fields.String( + required=True, + metadata={ + 'description': 'User ID', + 'example': '@myuser:matrix.example.org', + }, + ) + + display_name = fields.String( + attribute='displayname', + metadata={ + 'description': 'User display name', + 'example': 'Foo Bar', + }, + ) + + avatar_url = fields.URL( + metadata={ + 'description': 'User avatar URL', + 'example': 'mxc://matrix.example.org/AbCdEfG0123456789', + } + ) + + +class MatrixMemberSchema(MatrixProfileSchema): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.fields['display_name'].attribute = 'display_name' + + +class MatrixRoomSchema(Schema): + room_id = fields.String( + required=True, + metadata={ + 'description': 'Room ID', + 'example': '!aBcDeFgHiJkMnO:matrix.example.org', + }, + ) + + name = fields.String( + metadata={ + 'description': 'Room name', + 'example': 'My Room', + } + ) + + display_name = fields.String( + metadata={ + 'description': 'Room display name', + 'example': 'My Room', + } + ) + + topic = fields.String( + metadata={ + 'description': 'Room topic', + 'example': 'My Room Topic', + } + ) + + avatar_url = fields.URL( + attribute='room_avatar_url', + metadata={ + 'description': 'Room avatar URL', + 'example': 'mxc://matrix.example.org/AbCdEfG0123456789', + }, + ) + + owner_id = fields.String( + attribute='own_user_id', + metadata={ + 'description': 'Owner user ID', + 'example': '@myuser:matrix.example.org', + }, + ) + + encrypted = fields.Bool() + + +class MatrixDeviceSchema(Schema): + device_id = fields.String( + required=True, + attribute='id', + metadata={ + 'description': 'ABCDEFG', + }, + ) + + user_id = fields.String( + required=True, + metadata={ + 'description': 'User ID associated to the device', + 'example': '@myuser:matrix.example.org', + }, + ) + + display_name = fields.String( + metadata={ + 'description': 'Display name of the device', + 'example': 'Element Android', + }, + ) + + blacklisted = fields.Boolean() + deleted = fields.Boolean(default=False) + ignored = fields.Boolean() + verified = fields.Boolean() + + keys = fields.Dict( + metadata={ + 'description': 'Encryption keys supported by the device', + 'example': { + 'curve25519': 'BtlB0vaQmtYFsvOYkmxyzw9qP5yGjuAyRh4gXh3q', + 'ed25519': 'atohIK2FeVlYoY8xxpZ1bhDbveD+HA2DswNFqUxP', + }, + }, + ) + + +class MatrixMyDeviceSchema(Schema): + device_id = fields.String( + required=True, + attribute='id', + metadata={ + 'description': 'ABCDEFG', + }, + ) + + display_name = fields.String( + metadata={ + 'description': 'Device display name', + 'example': 'My Device', + } + ) + + last_seen_ip = fields.String( + metadata={ + 'description': 'Last IP associated to this device', + 'example': '1.2.3.4', + } + ) + + last_seen_date = DateTime( + metadata={ + 'description': 'The last time that the device was reported online', + 'example': '2022-07-23T17:20:01.254223', + } + ) + + +class MatrixDownloadedFileSchema(Schema): + url = fields.String( + metadata={ + 'description': 'Matrix URL of the original resource', + 'example': 'mxc://matrix.example.org/YhQycHvFOvtiDDbEeWWtEhXx', + }, + ) + + path = fields.String( + metadata={ + 'description': 'Local path where the file has been saved', + 'example': '/home/user/Downloads/image.png', + } + ) + + content_type = fields.String( + metadata={ + 'description': 'Content type of the downloaded file', + 'example': 'image/png', + } + ) + + size = fields.Int( + metadata={ + 'description': 'Length in bytes of the output file', + 'example': 1024, + } + ) + + +class MatrixMessageSchema(Schema): + event_id = fields.String( + required=True, + metadata={ + 'description': 'Event ID associated to this message', + 'example': '$2eOQ5ueafANj91GnPCRkRUOOjM7dI5kFDOlfMNCD2ly', + }, + ) + + room_id = fields.String( + required=True, + metadata={ + 'description': 'The ID of the room containing the message', + 'example': '!aBcDeFgHiJkMnO:matrix.example.org', + }, + ) + + user_id = fields.String( + required=True, + attribute='sender', + metadata={ + 'description': 'ID of the user who sent the message', + 'example': '@myuser:matrix.example.org', + }, + ) + + body = fields.String( + required=True, + metadata={ + 'description': 'Message body', + 'example': 'Hello world!', + }, + ) + + format = fields.String( + metadata={ + 'description': 'Message format', + 'example': 'markdown', + }, + ) + + formatted_body = fields.String( + metadata={ + 'description': 'Formatted body', + 'example': '**Hello world!**', + }, + ) + + url = fields.String( + metadata={ + 'description': 'mxc:// URL if this message contains an attachment', + 'example': 'mxc://matrix.example.org/oarGdlpvcwppARPjzNlmlXkD', + }, + ) + + content_type = fields.String( + attribute='mimetype', + metadata={ + 'description': 'If the message contains an attachment, this field ' + 'will contain its MIME type', + 'example': 'image/jpeg', + }, + ) + + transaction_id = fields.String( + metadata={ + 'description': 'Set if this message a unique transaction_id associated', + 'example': 'mQ8hZR6Dx8I8YDMwONYmBkf7lTgJSMV/ZPqosDNM', + }, + ) + + decrypted = fields.Bool( + metadata={ + 'description': 'True if the message was encrypted and has been ' + 'successfully decrypted', + }, + ) + + verified = fields.Bool( + metadata={ + 'description': 'True if this is an encrypted message coming from a ' + 'verified source' + }, + ) + + hashes = fields.Dict( + metadata={ + 'description': 'If the message has been decrypted, this field ' + 'contains a mapping of its hashes', + 'example': {'sha256': 'yoQLQwcURq6/bJp1xQ/uhn9Z2xeA27KhMhPd/mfT8tR'}, + }, + ) + + iv = fields.String( + metadata={ + 'description': 'If the message has been decrypted, this field ' + 'contains the encryption initial value', + 'example': 'NqJMMdijlLvAAAAAAAAAAA', + }, + ) + + key = fields.Dict( + metadata={ + 'description': 'If the message has been decrypted, this field ' + 'contains the encryption/decryption key', + 'example': { + 'alg': 'A256CTR', + 'ext': True, + 'k': 'u6jjAyNvJoBHE55P5ZfvX49m3oSt9s_L4PSQdprRSJI', + 'key_ops': ['encrypt', 'decrypt'], + 'kty': 'oct', + }, + }, + ) + + timestamp = MillisecondsTimestamp( + required=True, + attribute='server_timestamp', + metadata={ + 'description': 'When the event was registered on the server', + 'example': '2022-07-23T17:20:01.254223', + }, + ) + + +class MatrixMessagesResponseSchema(Schema): + messages = fields.Nested( + MatrixMessageSchema(), + many=True, + required=True, + attribute='chunk', + ) + + start = fields.String( + required=True, + nullable=True, + metadata={ + 'description': 'Pointer to the first message. It can be used as a ' + '``start``/``end`` for another ``get_messages`` query.', + 'example': 's10226_143893_619_3648_5951_5_555_7501_0', + }, + ) + + end = fields.String( + required=True, + nullable=True, + metadata={ + 'description': 'Pointer to the last message. It can be used as a ' + '``start``/``end`` for another ``get_messages`` query.', + 'example': 't2-10202_143892_626_3663_5949_6_558_7501_0', + }, + ) + + start_time = MillisecondsTimestamp( + required=True, + nullable=True, + metadata={ + 'description': 'The oldest timestamp of the returned messages', + 'example': '2022-07-23T16:20:01.254223', + }, + ) + + end_time = MillisecondsTimestamp( + required=True, + nullable=True, + metadata={ + 'description': 'The newest timestamp of the returned messages', + 'example': '2022-07-23T18:20:01.254223', + }, + ) diff --git a/setup.py b/setup.py index a5e379d734..5961bc60f2 100755 --- a/setup.py +++ b/setup.py @@ -268,5 +268,7 @@ setup( 'ngrok': ['pyngrok'], # Support for IRC integration 'irc': ['irc'], + # Support for the Matrix integration + 'matrix': ['matrix-nio'], }, )