diff --git a/docs/source/conf.py b/docs/source/conf.py index a3f641d2..00097679 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -138,15 +138,12 @@ latex_elements = { # The paper size ('letterpaper' or 'a4paper'). # # 'papersize': 'letterpaper', - # The font size ('10pt', '11pt' or '12pt'). # # 'pointsize': '10pt', - # Additional stuff for the LaTeX preamble. # # 'preamble': '', - # Latex figure (float) alignment # # 'figure_align': 'htbp', @@ -156,8 +153,7 @@ latex_elements = { # (source start file, target name, title, # author, documentclass [howto, manual, or own class]). latex_documents = [ - (master_doc, 'platypush.tex', 'platypush Documentation', - 'BlackLight', 'manual'), + (master_doc, 'platypush.tex', 'platypush Documentation', 'BlackLight', 'manual'), ] @@ -165,10 +161,7 @@ latex_documents = [ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). -man_pages = [ - (master_doc, 'platypush', 'platypush Documentation', - [author], 1) -] +man_pages = [(master_doc, 'platypush', 'platypush Documentation', [author], 1)] # -- Options for Texinfo output ---------------------------------------------- @@ -177,9 +170,15 @@ man_pages = [ # (source start file, target name, title, author, # dir menu entry, description, category) texinfo_documents = [ - (master_doc, 'platypush', 'platypush Documentation', - author, 'platypush', 'One line description of project.', - 'Miscellaneous'), + ( + master_doc, + 'platypush', + 'platypush Documentation', + author, + 'platypush', + 'One line description of project.', + 'Miscellaneous', + ), ] @@ -199,99 +198,101 @@ autodoc_default_options = { 'inherited-members': True, } -autodoc_mock_imports = ['googlesamples.assistant.grpc.audio_helpers', - 'google.assistant.embedded', - 'google.assistant.library', - 'google.assistant.library.event', - 'google.assistant.library.file_helpers', - 'google.oauth2.credentials', - 'oauth2client', - 'apiclient', - 'tenacity', - 'smartcard', - 'Leap', - 'oauth2client', - 'rtmidi', - 'bluetooth', - 'gevent.wsgi', - 'Adafruit_IO', - 'pyperclip', - 'pydbus', - 'inputs', - 'inotify', - 'omxplayer', - 'plexapi', - 'cwiid', - 'sounddevice', - 'soundfile', - 'numpy', - 'cv2', - 'nfc', - 'ndef', - 'bcrypt', - 'google', - 'feedparser', - 'kafka', - 'googlesamples', - 'icalendar', - 'httplib2', - 'mpd', - 'serial', - 'pyHS100', - 'grpc', - 'envirophat', - 'gps', - 'picamera', - 'pmw3901', - 'PIL', - 'croniter', - 'pyaudio', - 'avs', - 'PyOBEX', - 'todoist', - 'trello', - 'telegram', - 'telegram.ext', - 'pyfirmata2', - 'cups', - 'graphyte', - 'cpuinfo', - 'psutil', - 'openzwave', - 'deepspeech', - 'wave', - 'pvporcupine ', - 'pvcheetah', - 'pyotp', - 'linode_api4', - 'pyzbar', - 'tensorflow', - 'keras', - 'pandas', - 'samsungtvws', - 'paramiko', - 'luma', - 'zeroconf', - 'dbus', - 'gi', - 'gi.repository', - 'twilio', - 'Adafruit_Python_DHT', - 'RPi.GPIO', - 'RPLCD', - 'imapclient', - 'pysmartthings', - 'aiohttp', - 'watchdog', - 'pyngrok', - 'irc', - 'irc.bot', - 'irc.strings', - 'irc.client', - 'irc.connection', - 'irc.events', - 'defusedxml', - ] +autodoc_mock_imports = [ + 'googlesamples.assistant.grpc.audio_helpers', + 'google.assistant.embedded', + 'google.assistant.library', + 'google.assistant.library.event', + 'google.assistant.library.file_helpers', + 'google.oauth2.credentials', + 'oauth2client', + 'apiclient', + 'tenacity', + 'smartcard', + 'Leap', + 'oauth2client', + 'rtmidi', + 'bluetooth', + 'gevent.wsgi', + 'Adafruit_IO', + 'pyperclip', + 'pydbus', + 'inputs', + 'inotify', + 'omxplayer', + 'plexapi', + 'cwiid', + 'sounddevice', + 'soundfile', + 'numpy', + 'cv2', + 'nfc', + 'ndef', + 'bcrypt', + 'google', + 'feedparser', + 'kafka', + 'googlesamples', + 'icalendar', + 'httplib2', + 'mpd', + 'serial', + 'pyHS100', + 'grpc', + 'envirophat', + 'gps', + 'picamera', + 'pmw3901', + 'PIL', + 'croniter', + 'pyaudio', + 'avs', + 'PyOBEX', + 'todoist', + 'trello', + 'telegram', + 'telegram.ext', + 'pyfirmata2', + 'cups', + 'graphyte', + 'cpuinfo', + 'psutil', + 'openzwave', + 'deepspeech', + 'wave', + 'pvporcupine ', + 'pvcheetah', + 'pyotp', + 'linode_api4', + 'pyzbar', + 'tensorflow', + 'keras', + 'pandas', + 'samsungtvws', + 'paramiko', + 'luma', + 'zeroconf', + 'dbus', + 'gi', + 'gi.repository', + 'twilio', + 'Adafruit_Python_DHT', + 'RPi.GPIO', + 'RPLCD', + 'imapclient', + 'pysmartthings', + 'aiohttp', + 'watchdog', + 'pyngrok', + 'irc', + 'irc.bot', + 'irc.strings', + 'irc.client', + 'irc.connection', + 'irc.events', + 'defusedxml', + 'nio', +] sys.path.insert(0, os.path.abspath('../..')) diff --git a/platypush/message/event/matrix.py b/platypush/message/event/matrix.py index d1ac3f8d..bc3e6fdb 100644 --- a/platypush/message/event/matrix.py +++ b/platypush/message/event/matrix.py @@ -67,6 +67,92 @@ class MatrixMessageEvent(MatrixEvent): super().__init__(*args, body=body, **kwargs) +class MatrixMediaMessageEvent(MatrixMessageEvent): + """ + Event triggered when a media message is received on a subscribed room. + """ + + def __init__(self, *args, url: str, **kwargs): + """ + :param url: The URL of the media file. + """ + super().__init__(*args, url=url, **kwargs) + + +class MatrixStickerEvent(MatrixMediaMessageEvent): + """ + Event triggered when a sticker is sent to a room. + """ + + +class MatrixReactionEvent(MatrixEvent): + """ + Event triggered when a user submits a reaction to an event. + """ + + def __init__(self, *args, in_response_to_event_id: str, **kwargs): + """ + :param in_response_to_event_id: The ID of the URL related to the reaction. + """ + super().__init__( + *args, in_response_to_event_id=in_response_to_event_id, **kwargs + ) + + +class MatrixEncryptedMessageEvent(MatrixMessageEvent): + """ + Event triggered when a message is received but the client doesn't + have the E2E keys to decrypt it, or encryption has not been enabled. + """ + + +class MatrixCallEvent(MatrixEvent): + """ + Base class for Matrix call events. + """ + + def __init__( + self, *args, call_id: str, version: int, sdp: str | None = None, **kwargs + ): + """ + :param call_id: The unique ID of the call. + :param version: An increasing integer representing the version of the call. + :param sdp: SDP text of the session description. + """ + super().__init__(*args, call_id=call_id, version=version, sdp=sdp, **kwargs) + + +class MatrixCallInviteEvent(MatrixCallEvent): + """ + Event triggered when the user is invited to a call. + """ + + def __init__(self, *args, invite_validity: float | None = None, **kwargs): + """ + :param invite_validity: For how long the invite will be valid, in seconds. + :param sdp: SDP text of the session description. + """ + super().__init__(*args, invite_validity=invite_validity, **kwargs) + + +class MatrixCallAnswerEvent(MatrixCallEvent): + """ + Event triggered by the callee when they wish to answer the call. + """ + + +class MatrixCallHangupEvent(MatrixCallEvent): + """ + Event triggered when a participant in the call exists. + """ + + +class MatrixRoomCreatedEvent(MatrixEvent): + """ + Event triggered when a room is created. + """ + + class MatrixRoomJoinEvent(MatrixEvent): """ Event triggered when a user joins a room. @@ -81,17 +167,11 @@ class MatrixRoomLeaveEvent(MatrixEvent): class MatrixRoomInviteEvent(MatrixEvent): """ - Event triggered when a user is invited to a room. + Event triggered when the user is invited to a room. """ -class MatrixRoomInviteMeEvent(MatrixEvent): - """ - Event triggered when the currently logged in user is invited to a room. - """ - - -class MatrixRoomTopicChangeEvent(MatrixEvent): +class MatrixRoomTopicChangedEvent(MatrixEvent): """ Event triggered when the topic/title of a room changes. """ diff --git a/platypush/plugins/matrix/__init__.py b/platypush/plugins/matrix/__init__.py index 42f9fc7d..ad22f439 100644 --- a/platypush/plugins/matrix/__init__.py +++ b/platypush/plugins/matrix/__init__.py @@ -1,488 +1,701 @@ import datetime import json +import logging import multiprocessing import os import pathlib -import requests -from abc import ABC, abstractmethod -from urllib.parse import urljoin -from typing import Optional, Collection, Dict, Tuple, Any +import re -from platypush.config import Config -from platypush.context import get_bus -from platypush.message.event.matrix import ( - MatrixEvent, - MatrixRoomTopicChangeEvent, - MatrixMessageEvent, - MatrixRoomJoinEvent, - MatrixRoomLeaveEvent, - MatrixRoomInviteEvent, - MatrixRoomInviteMeEvent, +from aiohttp import ClientConnectionError, ServerDisconnectedError +from dataclasses import dataclass +from functools import wraps +from typing import Callable + +from async_lru import alru_cache +from nio import ( + AsyncClient, + AsyncClientConfig, + CallAnswerEvent, + CallHangupEvent, + CallInviteEvent, + DevicesError, + Event, + InviteNameEvent, + JoinedRoomsError, + KeyVerificationStart, + LoginResponse, + MatrixRoom, + MegolmEvent, + ProfileGetResponse, + RoomCreateEvent, + RoomGetEventError, + RoomGetStateError, + RoomGetStateResponse, + RoomMemberEvent, + RoomMessageText, + RoomMessageMedia, + RoomTopicEvent, + RoomUpgradeEvent, + StickerEvent, + UnknownEncryptedEvent, + UnknownEvent, ) +from nio.client.async_client import client_session + +from platypush.config import Config +from platypush.context import get_bus, get_or_create_event_loop +from platypush.message.event.matrix import ( + MatrixCallAnswerEvent, + MatrixCallHangupEvent, + MatrixCallInviteEvent, + MatrixEncryptedMessageEvent, + MatrixMediaMessageEvent, + MatrixMessageEvent, + MatrixReactionEvent, + MatrixRoomCreatedEvent, + MatrixRoomInviteEvent, + MatrixRoomJoinEvent, + MatrixRoomLeaveEvent, + MatrixRoomTopicChangedEvent, + MatrixStickerEvent, +) + +from platypush.message.response import Response from platypush.plugins import RunnablePlugin, action +from platypush.schemas.matrix import ( + MatrixDeviceSchema, + MatrixProfileSchema, + MatrixRoomSchema, +) + +from platypush.utils import set_thread_name + +logger = logging.getLogger(__name__) -class RetrieveWorker(ABC): - def __init__(self, server_url: str, access_token: str): - self._server_url = server_url - self._access_token = access_token +@dataclass +class Credentials: + server_url: str + user_id: str + access_token: str + device_id: str | None - @abstractmethod - def _url(self, id: int | str) -> str: - raise NotImplementedError() - - @abstractmethod - def _process_response(self, rs: dict | Collection[dict]) -> dict: - raise NotImplementedError() - - def __call__(self, id: str) -> Tuple[str, dict]: - url = urljoin(self._server_url, self._url(id)) - rs = requests.get( - url, - headers={ - 'Authorization': f'Bearer {self._access_token}', - }, - ) - - rs.raise_for_status() - return (id, self._process_response(rs.json())) - - -class UserRetrieveWorker(RetrieveWorker): - def _url(self, id: str) -> str: - return f'/_matrix/client/r0/profile/{id}' - - def _process_response(self, rs: dict) -> dict: + def to_dict(self) -> dict: return { - 'display_name': rs.get('displayname'), - 'avatar_url': rs.get('avatar_url'), + 'server_url': self.server_url, + 'user_id': self.user_id, + 'access_token': self.access_token, + 'device_id': self.device_id, } -class RoomRetrieveWorker(RetrieveWorker): - def _url(self, id: str) -> str: - return f'/_matrix/client/v3/rooms/{id}/state' +def _action_wrapper(f: Callable) -> Callable: + @wraps(f) + def _wrapper(*args, **kwargs): + try: + return f(*args, **kwargs) + except Exception as e: + logger.exception(e) + return Response(errors=[str(e)]) - def _process_response(self, rs: Collection[dict]) -> dict: - info = {} - for event in rs: - event_type = event.get('type') - if event_type == 'm.room.name': - info['name'] = event.get('content', {}).get('name') - elif event_type == 'm.room.topic': - info['name'] = event.get('content', {}).get('topic') + return _wrapper - return info + +class MatrixClient(AsyncClient): + def __init__( + self, + *args, + credentials_file: str, + store_path: str | None = None, + config: AsyncClientConfig | None = None, + autojoin_on_invite=False, + **kwargs, + ): + credentials_file = os.path.abspath(os.path.expanduser(credentials_file)) + if not store_path: + store_path = os.path.join(Config.get('workdir'), 'matrix', 'store') # type: ignore + if store_path: + store_path = os.path.abspath(os.path.expanduser(store_path)) + pathlib.Path(store_path).mkdir(exist_ok=True, parents=True) + if not config: + config = AsyncClientConfig( + max_limit_exceeded=0, + max_timeouts=0, + store_sync_tokens=True, + encryption_enabled=True, + ) + + super().__init__(*args, config=config, store_path=store_path, **kwargs) + self.logger = logging.getLogger(self.__class__.__name__) + self._credentials_file = credentials_file + self._autojoin_on_invite = autojoin_on_invite + + async def _autojoin_room_callback(self, room: MatrixRoom, *_): + await self.join(room.room_id) # type: ignore + + def _load_from_file(self): + if not os.path.isfile(self._credentials_file): + return + + try: + with open(self._credentials_file, 'r') as f: + credentials = json.load(f) + except json.JSONDecodeError: + self.logger.warning( + 'Could not read credentials_file %s - overwriting it', + self._credentials_file, + ) + return + + assert credentials.get('user_id'), 'Missing user_id' + assert credentials.get('access_token'), 'Missing access_token' + + self.access_token = credentials['access_token'] + self.user_id = credentials['user_id'] + self.homeserver = credentials.get('server_url', self.homeserver) + if credentials.get('device_id'): + self.device_id = credentials['device_id'] + + self.load_store() + + async def login( + self, + password: str | None = None, + device_name: str | None = None, + token: str | None = None, + ) -> LoginResponse: + self._load_from_file() + login_res = None + + if self.access_token: + self.load_store() + self.logger.info( + 'Logged in to %s as %s using the stored access token', + self.homeserver, + self.user_id, + ) + + if self.should_upload_keys: + self.logger.info('Uploading encryption keys') + await self.keys_upload() + + login_res = LoginResponse( + user_id=self.user_id, + device_id=self.device_id, + access_token=self.access_token, + ) + else: + assert self.user, 'No credentials file found and no user provided' + login_args = {'device_name': device_name} + if token: + login_args['token'] = token + else: + assert ( + password + ), 'No credentials file found and no password nor access token provided' + login_args['password'] = password + + login_res = await super().login(**login_args) + assert isinstance(login_res, LoginResponse), f'Failed to login: {login_res}' + self.logger.info(login_res) + + credentials = Credentials( + server_url=self.homeserver, + user_id=login_res.user_id, + access_token=login_res.access_token, + device_id=login_res.device_id, + ) + + with open(self._credentials_file, 'w') as f: + json.dump(credentials.to_dict(), f) + os.chmod(self._credentials_file, 0o600) + + self.logger.info('Synchronizing rooms') + sync_token = self.loaded_sync_token + self.loaded_sync_token = '' + await self.sync(sync_filter={'room': {'timeline': {'limit': 1}}}) + + self.loaded_sync_token = sync_token + self.logger.info('Rooms synchronized') + self._add_callbacks() + return login_res + + def _add_callbacks(self): + self.add_event_callback(self._event_catch_all, Event) + self.add_event_callback(self._on_invite, InviteNameEvent) # type: ignore + self.add_event_callback(self._on_room_message, RoomMessageText) # type: ignore + self.add_event_callback(self._on_media_message, RoomMessageMedia) # type: ignore + self.add_event_callback(self._on_room_member, RoomMemberEvent) # type: ignore + self.add_event_callback(self._on_room_topic_changed, RoomTopicEvent) # type: ignore + self.add_event_callback(self._on_call_invite, CallInviteEvent) # type: ignore + self.add_event_callback(self._on_call_answer, CallAnswerEvent) # type: ignore + self.add_event_callback(self._on_call_hangup, CallHangupEvent) # type: ignore + self.add_event_callback(self._on_sticker_message, StickerEvent) # type: ignore + self.add_event_callback(self._on_unknown_event, UnknownEvent) # type: ignore + self.add_event_callback(self._on_unknown_encrypted_event, UnknownEncryptedEvent) # type: ignore + self.add_event_callback(self._on_unknown_encrypted_event, MegolmEvent) # type: ignore + self.add_to_device_callback(self._on_key_verification_start, KeyVerificationStart) # type: ignore + + if self._autojoin_on_invite: + self.add_event_callback(self._autojoin_room_callback, InviteNameEvent) # type: ignore + + @alru_cache(maxsize=500) + @client_session + async def get_profile(self, user_id: str | None = None) -> ProfileGetResponse: + """ + Cached version of get_profile. + """ + ret = await super().get_profile(user_id) + assert isinstance( + ret, ProfileGetResponse + ), f'Could not retrieve profile for user {user_id}: {ret.message}' + return ret + + @alru_cache(maxsize=500) + @client_session + async def room_get_state(self, room_id: str) -> RoomGetStateResponse: + """ + Cached version of room_get_state. + """ + ret = await super().room_get_state(room_id) + assert isinstance( + ret, RoomGetStateResponse + ), f'Could not retrieve profile for room {room_id}: {ret.message}' + return ret + + async def _event_base_args( + self, room: MatrixRoom, event: Event | None = None + ) -> dict: + sender_id = event.sender if event else None + sender = ( + await self.get_profile(sender_id) if sender_id else None # type: ignore + ) + + return { + 'server_url': self.homeserver, + 'sender_id': sender_id, + 'sender_display_name': sender.displayname if sender else None, + 'sender_avatar_url': sender.avatar_url if sender else None, + 'room_id': room.room_id, + 'room_name': room.name, + 'room_topic': room.topic, + 'server_timestamp': ( + datetime.datetime.fromtimestamp(event.server_timestamp / 1000) + if event and getattr(event, 'server_timestamp', None) + else None + ), + } + + async def _event_catch_all(self, room: MatrixRoom, event: Event): + self.logger.debug('Received event on room %s: %r', room.room_id, event) + + async def _on_invite(self, room: MatrixRoom, event: RoomMessageText): + get_bus().post( + MatrixRoomInviteEvent( + **(await self._event_base_args(room, event)), + ) + ) + + async def _on_room_message(self, room: MatrixRoom, event: RoomMessageText): + get_bus().post( + MatrixMessageEvent( + **(await self._event_base_args(room, event)), + body=event.body, + ) + ) + + async def _on_room_member(self, room: MatrixRoom, event: RoomMemberEvent): + evt_type = None + if event.membership == 'join': + evt_type = MatrixRoomJoinEvent + elif event.membership == 'leave': + evt_type = MatrixRoomLeaveEvent + + if evt_type: + get_bus().post( + evt_type( + **(await self._event_base_args(room, event)), + ) + ) + + async def _on_room_topic_changed(self, room: MatrixRoom, event: RoomTopicEvent): + get_bus().post( + MatrixRoomTopicChangedEvent( + **(await self._event_base_args(room, event)), + topic=event.topic, + ) + ) + + async def _on_call_invite(self, room: MatrixRoom, event: CallInviteEvent): + get_bus().post( + MatrixCallInviteEvent( + call_id=event.call_id, + version=event.version, + invite_validity=event.lifetime / 1000.0, + sdp=event.offer.get('sdp'), + **(await self._event_base_args(room, event)), + ) + ) + + async def _on_call_answer(self, room: MatrixRoom, event: CallAnswerEvent): + get_bus().post( + MatrixCallAnswerEvent( + call_id=event.call_id, + version=event.version, + sdp=event.answer.get('sdp'), + **(await self._event_base_args(room, event)), + ) + ) + + async def _on_call_hangup(self, room: MatrixRoom, event: CallHangupEvent): + get_bus().post( + MatrixCallHangupEvent( + call_id=event.call_id, + version=event.version, + **(await self._event_base_args(room, event)), + ) + ) + + async def _on_room_created(self, room: MatrixRoom, event: RoomCreateEvent): + get_bus().post( + MatrixRoomCreatedEvent( + **(await self._event_base_args(room, event)), + ) + ) + + async def _on_media_message(self, room: MatrixRoom, event: RoomMessageMedia): + get_bus().post( + MatrixMediaMessageEvent( + url=event.url, + **(await self._event_base_args(room, event)), + ) + ) + + async def _on_sticker_message(self, room: MatrixRoom, event: StickerEvent): + get_bus().post( + MatrixStickerEvent( + url=event.url, + **(await self._event_base_args(room, event)), + ) + ) + + def _on_key_verification_start(self, event: KeyVerificationStart): + assert self.olm, 'OLM state machine not initialized' + print('************ HERE') + print(event) + self.olm.handle_key_verification(event) + + async def _on_room_upgrade(self, room: MatrixRoom, event: RoomUpgradeEvent): + self.logger.info( + 'The room %s has been upgraded to %s', room.room_id, event.replacement_room + ) + + await self.room_leave(room.room_id) + await self.join(event.replacement_room) + + async def _on_unknown_encrypted_event( + self, room: MatrixRoom, event: UnknownEncryptedEvent | MegolmEvent + ): + body = getattr(event, 'ciphertext', '') + get_bus().post( + MatrixEncryptedMessageEvent( + body=body, + **(await self._event_base_args(room, event)), + ) + ) + + async def _on_unknown_event(self, room: MatrixRoom, event: UnknownEvent): + evt = None + + if event.type == 'm.reaction': + # Get the ID of the event this was a reaction to + relation_dict = event.source.get('content', {}).get('m.relates_to', {}) + reacted_to = relation_dict.get('event_id') + if reacted_to and relation_dict.get('rel_type') == 'm.annotation': + event_response = await self.room_get_event(room.room_id, reacted_to) + + if isinstance(event_response, RoomGetEventError): + self.logger.warning( + 'Error getting event that was reacted to (%s)', reacted_to + ) + else: + evt = MatrixReactionEvent( + in_response_to_event_id=event_response.event.event_id, + **(await self._event_base_args(room, event)), + ) + + if evt: + get_bus().post(evt) + else: + self.logger.info( + 'Received an unknown event on room %s: %r', room.room_id, event + ) class MatrixPlugin(RunnablePlugin): """ Matrix chat integration. + Requires: + + * **matrix-nio** (``pip install 'matrix-nio[e2e]'``) + * **libolm** (on Debian ```apt-get install libolm-devel``, on Arch + ``pacman -S libolm``) + * **async_lru** (``pip install async_lru``) + + Note that ``libolm`` and the ``[e2e]`` module are only required if you want E2E encryption + support. + Triggers: * :class:`platypush.message.event.matrix.MatrixMessageEvent`: when a message is received. + * :class:`platypush.message.event.matrix.MatrixMediaMessageEvent`: when a media message is received. + * :class:`platypush.message.event.matrix.MatrixRoomCreatedEvent`: when a room is created. * :class:`platypush.message.event.matrix.MatrixRoomJoinEvent`: when a user joins a room. * :class:`platypush.message.event.matrix.MatrixRoomLeaveEvent`: when a user leaves a room. - * :class:`platypush.message.event.matrix.MatrixRoomInviteEvent`: when a user (other than the - currently logged one) is invited to a room. - * :class:`platypush.message.event.matrix.MatrixRoomMeInviteEvent`: when the currently logged in - user is invited to a room. - * :class:`platypush.message.event.matrix.MatrixRoomTopicChangeEvent`: when the topic/title of a room changes. + * :class:`platypush.message.event.matrix.MatrixRoomInviteEvent`: when the user is invited to a room. + * :class:`platypush.message.event.matrix.MatrixRoomTopicChangedEvent`: when the topic/title of a room changes. + * :class:`platypush.message.event.matrix.MatrixCallInviteEvent`: when the user is invited to a call. + * :class:`platypush.message.event.matrix.MatrixCallAnswerEvent`: when a called user wishes to pick the call. + * :class:`platypush.message.event.matrix.MatrixCallHangupEvent`: when a called user exits the call. + * :class:`platypush.message.event.matrix.MatrixStickerEvent`: when a sticker is sent to a room. + * :class:`platypush.message.event.matrix.MatrixEncryptedMessageEvent`: when a message is received but the + client doesn't have the E2E keys to decrypt it, or encryption has not been enabled. """ def __init__( self, server_url: str = 'https://matrix.to', - username: str | None = None, + user_id: str | None = None, password: str | None = None, access_token: str | None = None, + device_name: str | None = 'platypush', + device_id: str | None = None, autojoin_on_invite: bool = False, **kwargs, ): """ - Authentication requires either username/password or an access token. + Authentication requires user_id/password on the first login. + Afterwards, session credentials are stored under + ``<$PLATYPUSH_WORKDIR>/matrix/credentials.json`` (default: + ``~/.local/share/platypush/matrix/credentials.json``), and you can + remove the cleartext credentials from your configuration file. - If you don't want to provide cleartext credentials in the configuration, you can - retrieve an access token offline through the following request:: - - curl -XPOST '{"type":"m.login.password", "user":"username", "password":"password"}' \ - "https://matrix.example.com/_matrix/client/r0/login" - - This may be required if the user or the instance enforce 2FA. + Otherwise, if you already have an ``access_token``, you can set the + associated field instead of using ``password``. This may be required if + the user has 2FA enabled. :param server_url: Default Matrix instance base URL (default: ``https://matrix.to``). - :param username: Default username. Provide either username/password _or_ an access token. - :param password: Default password. Provide either username/password _or_ an access token. - :param access_token: Default access token. Provide either username/password _or_ an access token. + :param user_id: user_id, in the format ``@user:example.org``, or just the username if the + account is hosted on the same server configured in the ``server_url``. + :param password: User password. + :param access_token: User access token. + :param device_name: The name of this device/connection (default: ``platypush``). + :param device_id: Use an existing ``device_id`` for the sessions. :param autojoin_on_invite: Whether the account should automatically join rooms upon invite. If false (default value), then you may want to implement your own - logic in an event hook when a :class:`platypush.message.event.matrix.MatrixRoomInviteMeEvent` + logic in an event hook when a :class:`platypush.message.event.matrix.MatrixRoomInviteEvent` event is received, and call the :meth:`.join` method if required. """ super().__init__(**kwargs) + if not (server_url.startswith('http://') or server_url.startswith('https://')): + server_url = f'https://{server_url}' self._server_url = server_url - self._user_id = None + server_name = self._server_url.split('/')[2].split(':')[0] + + if user_id and not re.match(user_id, '^@[a-zA-Z0-9.-_]+:.+'): + user_id = f'@{user_id}:{server_name}' + + self._matrix_proc: multiprocessing.Process | None = None + self._user_id = user_id + self._password = password + self._access_token = access_token + self._device_name = device_name + self._device_id = device_id self._autojoin_on_invite = autojoin_on_invite + self._event_loop = get_or_create_event_loop() self._workdir = os.path.join(Config.get('workdir'), 'matrix') # type: ignore + self._credentials_file = os.path.join(self._workdir, 'credentials.json') + self._client = self._get_client() pathlib.Path(self._workdir).mkdir(parents=True, exist_ok=True) - self._sessions_file = os.path.join(self._workdir, 'sessions.json') - self._credentials_file = os.path.join(self._workdir, 'credentials.json') - self._users_cache_file = os.path.join(self._workdir, 'users.json') - self._rooms_cache_file = os.path.join(self._workdir, 'rooms.json') - self._users_cache: Dict[str, dict] = {} - self._rooms_cache: Dict[str, dict] = {} - self._set_credentials(username, password, access_token, overwrite=True) - - def _set_credentials( - self, - username: str | None = None, - password: str | None = None, - access_token: str | None = None, - overwrite: bool = False, - ): - if username or overwrite: - self._username = username - if password or overwrite: - self._password = password - if access_token or overwrite: - self._access_token = access_token - - def _execute(self, url: str, method: str = 'get', **kwargs): - if self._access_token: - kwargs['headers'] = { - 'Authorization': f'Bearer {self._access_token}', - **kwargs.get('headers', {}), - } - - url = urljoin(self._server_url, f'/_matrix/client/{url.lstrip("/")}') - req_method = getattr(requests, method.lower()) - rs = req_method(url, **kwargs) - rs.raise_for_status() - rs = rs.json() - assert not rs.get('error'), rs.get('error') - return rs - - def _save_credentials(self, credentials: dict): - with open(self._credentials_file, 'w') as f: - json.dump(credentials, f) - - def _refresh_user_id(self): - devices = self._execute('/v3/devices').get('devices', []) - assert devices, 'The user is not logged into any devices' - self._user_id = devices[0]['user_id'] - - @action - def login( - self, - server_url: str | None = None, - username: str | None = None, - password: str | None = None, - access_token: str | None = None, - ): - """ - Login to an instance if username/password/access_token were not specified in the plugin - configuration. Otherwise, change the currently logged user or instance. - - :param server_url: New Matrix instance base URL. - :param username: New username. - :param password: New user password. - :param access_token: New access token. - """ - self._server_url = server_url or self._server_url - self._set_credentials(username, password, access_token, overwrite=False) - - if self._access_token: - self._refresh_user_id() - elif self._username and self._password: - rs = self._execute( - '/r0/login', - method='post', - json={ - 'type': 'm.login.password', - 'user': self._username, - 'password': self._password, - 'initial_device_display_name': 'Platypush Matrix integration', - }, - ) - - assert rs.get('access_token'), 'No access token provided by the server' - self._access_token = rs['access_token'] - self._user_id = rs['user_id'] - self._save_credentials(rs) - elif os.path.isfile(self._credentials_file): - with open(self._credentials_file, 'r') as f: - self._access_token = json.load(f)['access_token'] - self._refresh_user_id() - else: - raise AssertionError( - 'No username, password and access token provided nor stored' - ) - - self.logger.info( - f'Successfully logged in to {self._server_url} as {self._user_id}' + def _get_client(self) -> AsyncClient: + return MatrixClient( + homeserver=self._server_url, + user=self._user_id, + credentials_file=self._credentials_file, + autojoin_on_invite=self._autojoin_on_invite, + device_id=self._device_id, ) - @staticmethod - def _timestamp_to_datetime(t: int | float) -> datetime.datetime: - return datetime.datetime.fromtimestamp(t / 1000) + def _login(self) -> AsyncClient: + if not self._client: + self._client = self._get_client() - def _parse_event( - self, room_id: str, event: dict, users: dict - ) -> Optional[MatrixEvent]: - evt_type = event.get('type') - evt_class = None - room_info = self._rooms_cache.get(room_id, {}) - args: Dict[str, Any] = { - 'server_url': self._server_url, - 'room_id': room_id, - 'room_name': room_info.get('name'), - 'room_topic': room_info.get('topic'), - } - - if event.get('sender') and isinstance(event.get('sender'), str): - cached_user = users.get(event['sender'], {}) - args['sender_id'] = event['sender'] - args['sender_display_name'] = cached_user.get('display_name') - args['sender_avatar_url'] = cached_user.get('avatar_url') - - if event.get('origin_server_ts'): - args['server_timestamp'] = self._timestamp_to_datetime( - event['origin_server_ts'] + self._event_loop.run_until_complete( + self._client.login( + password=self._password, + device_name=self._device_name, + token=self._access_token, ) + ) - if evt_type == 'm.room.topic': - evt_class = MatrixRoomTopicChangeEvent - args['topic'] = event.get('content', {}).get('topic') # type: ignore - # TODO Handle encrypted rooms events (`m.room.encrypted`) - elif evt_type == 'm.room.message': - evt_class = MatrixMessageEvent - args['body'] = event.get('content', {}).get('body') # type: ignore - elif evt_type == 'm.room.member': - membership = event.get('content', {}).get('membership') - if membership == 'join': - evt_class = MatrixRoomJoinEvent - elif membership == 'invite': - evt_class = MatrixRoomInviteEvent - elif membership == 'leave': - evt_class = MatrixRoomLeaveEvent + return self._client - if evt_class: - return evt_class(**args) + def _connect(self): + if self.should_stop() or (self._matrix_proc and self._matrix_proc.is_alive()): + self.logger.debug('Already connected') + return - def _parse_invite_event( - self, room_id: str, events: Collection[dict] - ) -> MatrixRoomInviteMeEvent: - evt_args: Dict[str, Any] = { - 'server_url': self._server_url, - 'room_id': room_id, - } + self._login() + self._matrix_proc = multiprocessing.Process(target=self._run_client) + self._matrix_proc.start() - for event in events: - evt_type = event.get('type') - if evt_type == 'm.room.name': - evt_args['room_name'] = event.get('content', {}).get('name') - elif evt_type == 'm.room.topic': - evt_args['room_topic'] = event.get('content', {}).get('topic') - if event.get('origin_server_ts'): - evt_args['server_timestamp'] = self._timestamp_to_datetime( - event['origin_server_ts'] + async def _run_async_client(self): + await self._client.sync_forever(timeout=0, full_state=True) + + def _run_client(self): + set_thread_name('matrix-client') + + while True: + try: + self._event_loop.run_until_complete(self._run_async_client()) + except (ClientConnectionError, ServerDisconnectedError): + self.logger.warning( + 'Cannot connect to the Matrix server. Retrying in 15s' ) + self._should_stop.wait(15) + except KeyboardInterrupt: + pass + finally: + if self._client: + self._event_loop.run_until_complete(self._client.close()) + self._matrix_proc = None + self._connect() - if evt_args.get('room_name'): - self._rooms_cache[room_id] = { - 'room_id': room_id, - 'room_name': evt_args['room_name'], - 'room_topic': evt_args.get('room_topic'), - } + @action + @_action_wrapper + def send_message( + self, + room_id: str, + message_type: str = 'text', + body: str | None = None, + tx_id: str | None = None, + ignore_unverified_devices: bool = False, + ): + """ + Send a message to a room. - self._rewrite_rooms_cache() - - return MatrixRoomInviteMeEvent(**evt_args) - - def _retrieve_users_info(self, users: Collection[str]) -> Dict[str, dict]: - users_info = {user: {} for user in users} - retrieve = UserRetrieveWorker(self._server_url, self._access_token or '') - with multiprocessing.Pool(4) as pool: - pool_res = pool.map(retrieve, users_info.keys()) - - return { - user_id: { - 'user_id': user_id, - **info, - } - for user_id, info in pool_res - } - - def _retrieve_rooms_info(self, rooms: Collection[str]) -> Dict[str, dict]: - rooms_info = {room: {} for room in rooms} - retrieve = RoomRetrieveWorker(self._server_url, self._access_token or '') - with multiprocessing.Pool(4) as pool: - pool_res = pool.map(retrieve, rooms_info.keys()) - - return { - room_id: { - 'room_id': room_id, - **info, - } - for room_id, info in pool_res - } - - def _extract_senders(self, rooms) -> Dict[str, dict]: - cache_has_changes = False - senders = set() - - for room in rooms: - room_events = room.get('timeline', {}).get('events', []) - for evt in room_events: - if evt.get('type') == 'm.room.member': - cache_has_changes = True - self._users_cache[evt['sender']] = { - 'user_id': evt['sender'], - 'display_name': evt.get('content', {}).get('displayname'), - 'avatar_url': evt.get('content', {}).get('avatar_url'), - } - - senders.update({evt['sender'] for evt in room_events if evt.get('sender')}) - - missing_senders = {user for user in senders if user not in self._users_cache} - - if missing_senders: - cache_has_changes = True - self._users_cache.update(self._retrieve_users_info(missing_senders)) - - senders_map = { - user: self._users_cache.get(user, {'user_id': user}) for user in senders - } - - if cache_has_changes: - self._rewrite_users_cache() - - return senders_map - - def _extract_rooms(self, rooms: Collection[str]) -> Dict[str, dict]: - missing_rooms_info = { - room_id for room_id in rooms if not self._rooms_cache.get(room_id) - } - - if missing_rooms_info: - self._rooms_cache.update(self._retrieve_rooms_info(missing_rooms_info)) - self._rewrite_rooms_cache() - - return { - room_id: self._rooms_cache.get( - room_id, - { - 'room_id': room_id, + :param room_id: Room ID. + :param body: Message body. + :param message_type: Message type. Supported: `text`, `audio`, `video`, + `image`. Default: `text`. + :param tx_id: Unique transaction ID to associate to this message. + :param ignore_unverified_devices: If true, unverified devices will be + ignored (default: False). + """ + message_type = 'm.' + message_type + return self._event_loop.run_until_complete( + self._client.room_send( + message_type=message_type, + room_id=room_id, + tx_id=tx_id, + ignore_unverified_devices=ignore_unverified_devices, + content={ + 'body': body, }, ) - for room_id in rooms - } - - def _process_events(self, events: dict) -> Collection[MatrixEvent]: - rooms = events.get('rooms', {}) - joined_rooms = rooms.get('join', {}) - invited_rooms = rooms.get('invite', {}) - parsed_events = [] - senders = self._extract_senders(joined_rooms.values()) - self._extract_rooms(joined_rooms.keys()) - - # Create joined rooms events - for room_id, room in joined_rooms.items(): - room_events = room.get('timeline', {}).get('events', []) - parsed_room_events = [ - self._parse_event(room_id=room_id, event=event, users=senders) - for event in room_events - ] - - parsed_events.extend([evt for evt in parsed_room_events if evt]) - - # Create invite events - for room_id, room in invited_rooms.items(): - room_events = room.get('invite_state', {}).get('events', []) - parsed_room_event = self._parse_invite_event( - room_id=room_id, events=room_events - ) - parsed_events.append(parsed_room_event) - - if self._autojoin_on_invite: - self.join(room_id) - - parsed_events.sort(key=lambda e: e.server_timestamp) - return parsed_events - - def _reload_users_cache(self): - if os.path.isfile(self._users_cache_file): - with open(self._users_cache_file, 'r') as f: - self._users_cache.update(json.load(f)) - - def _rewrite_users_cache(self): - with open(self._users_cache_file, 'w') as f: - json.dump(self._users_cache, f) - - def _reload_rooms_cache(self): - if os.path.isfile(self._rooms_cache_file): - with open(self._rooms_cache_file, 'r') as f: - self._rooms_cache.update(json.load(f)) - - def _rewrite_rooms_cache(self): - with open(self._rooms_cache_file, 'w') as f: - json.dump(self._rooms_cache, f) + ) @action - def sync(self): + @_action_wrapper + def get_profile(self, user_id: str): """ - Sync the state for the currently logged session. + Retrieve the details about a user. + + :param user_id: User ID. + :return: .. schema:: matrix.MatrixProfileSchema """ - next_batch = None - sessions = {} - if os.path.isfile(self._sessions_file): - with open(self._sessions_file, 'r') as f: - sessions = json.load(f) - next_batch = sessions.get(self._user_id, {}).get('next_batch') - - if not next_batch: - self.logger.info('Synchronizing Matrix events') - - rs = self._execute('/r0/sync', params={'since': next_batch}) - events = self._process_events(rs) - if events and next_batch: - for event in events: - get_bus().post(event) - - if not sessions.get(self._user_id): - sessions[self._user_id] = {} - - sessions[self._user_id]['next_batch'] = rs.get('next_batch') - with open(self._sessions_file, 'w') as f: - json.dump(sessions, f) - - if not next_batch: - self.logger.info('Matrix events synchronized') + profile = self._event_loop.run_until_complete(self._client.get_profile(user_id)) # type: ignore + profile.user_id = user_id # type: ignore + return MatrixProfileSchema().dump(profile) @action - def join(self, room_id: str): + @_action_wrapper + def get_room(self, room_id: str): """ - Join a room by ID. + Retrieve the details about a room. - :param room_id: Room ID or alias. + :param room_id: room ID. + :return: .. schema:: matrix.MatrixRoomSchema """ - self._execute(f'/v3/join/{room_id}', method='post') - self.logger.info('Successfully joined room %s', room_id) + response = self._event_loop.run_until_complete( + self._client.room_get_state(room_id) + ) + assert not isinstance(response, RoomGetStateError), response.message + room_args = {'room_id': room_id, 'own_user_id': None, 'encrypted': False} + room_params = {} + + for evt in response.events: + if evt.get('type') == 'm.room.create': + room_args['own_user_id'] = evt.get('content', {}).get('creator') + elif evt.get('type') == 'm.room.encryption': + room_args['encrypted'] = False + elif evt.get('type') == 'm.room.name': + room_params['name'] = evt.get('content', {}).get('name') + elif evt.get('type') == 'm.room.topic': + room_params['topic'] = evt.get('content', {}).get('topic') + + room = MatrixRoom(**room_args) + for k, v in room_params.items(): + setattr(room, k, v) + return MatrixRoomSchema().dump(room) + + @action + @_action_wrapper + def get_devices(self): + """ + Get the list of devices associated to the current user. + + :return: .. schema:: matrix.MatrixDeviceSchema(many=True) + """ + response = self._event_loop.run_until_complete(self._client.devices()) + assert not isinstance(response, DevicesError), response.message + return MatrixDeviceSchema().dump(response.devices, many=True) + + @action + @_action_wrapper + def get_joined_rooms(self): + """ + Retrieve the rooms that the user has joined. + """ + response = self._event_loop.run_until_complete(self._client.joined_rooms()) + assert not isinstance(response, JoinedRoomsError), response.message + + return [self.get_room(room_id).output for room_id in response.rooms] + + @action + @_action_wrapper + def upload_keys(self): + """ + Synchronize the E2EE keys with the homeserver. + """ + self._event_loop.run_until_complete(self._client.keys_upload()) def main(self): - self.login() - self._reload_users_cache() - self._reload_rooms_cache() + self._connect() + self.wait_stop() - while not self._should_stop.is_set(): - try: - self.sync() - finally: - self._should_stop.wait(timeout=10) + def stop(self): + if self._matrix_proc: + self._matrix_proc.terminate() + self._matrix_proc.join(timeout=10) + self._matrix_proc.kill() + self._matrix_proc = None + + super().stop() # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/matrix/manifest.yaml b/platypush/plugins/matrix/manifest.yaml index 5449b930..d564aaaf 100644 --- a/platypush/plugins/matrix/manifest.yaml +++ b/platypush/plugins/matrix/manifest.yaml @@ -1,10 +1,25 @@ manifest: events: platypush.message.event.matrix.MatrixMessageEvent: when a message is received. + platypush.message.event.matrix.MatrixMediaMessageEvent: when a media message is received. + platypush.message.event.matrix.MatrixRoomCreatedEvent: when a room is created. platypush.message.event.matrix.MatrixRoomJoinEvent: when a user joins a room. platypush.message.event.matrix.MatrixRoomLeaveEvent: when a user leaves a room. - platypush.message.event.matrix.MatrixRoomInviteEvent: when a user (other than the currently logged one) is invited to a room. - platypush.message.event.matrix.MatrixRoomMeInviteEvent: when the currently logged in user is invited to a room. - platypush.message.event.matrix.MatrixRoomTopicChangeEvent: when the topic/title of a room changes. + platypush.message.event.matrix.MatrixRoomInviteEvent: when the user is invited to a room. + platypush.message.event.matrix.MatrixRoomTopicChangedEvent: when the topic/title of a room changes. + platypush.message.event.matrix.MatrixCallInviteEvent: when the user is invited to a call. + platypush.message.event.matrix.MatrixCallAnswerEvent: when a called user wishes to pick the call. + platypush.message.event.matrix.MatrixCallHangupEvent: when a called user exits the call. + platypush.message.event.matrix.MatrixStickerEvent: when a sticker is sent to a room. + platypush.message.event.matrix.MatrixEncryptedMessageEvent: | + when a message is received but the client doesn't + have the E2E keys to decrypt it, or encryption has not been enabled. + apt: + - libolm-devel + pacman: + - libolm + pip: + - matrix-nio[e2e] + - async_lru package: platypush.plugins.matrix type: plugin diff --git a/platypush/schemas/matrix.py b/platypush/schemas/matrix.py new file mode 100644 index 00000000..3cda4ebb --- /dev/null +++ b/platypush/schemas/matrix.py @@ -0,0 +1,109 @@ +from marshmallow import fields +from marshmallow.schema import Schema + +from platypush.schemas import DateTime + + +class MatrixProfileSchema(Schema): + user_id = fields.String( + required=True, + metadata={ + 'description': 'User ID', + 'example': '@myuser:matrix.example.org', + }, + ) + + display_name = fields.String( + attribute='displayname', + metadata={ + 'description': 'User display name', + 'example': 'Foo Bar', + }, + ) + + avatar_url = fields.URL( + metadata={ + 'description': 'User avatar URL', + 'example': 'mxc://matrix.platypush.tech/AbCdEfG0123456789', + } + ) + + +class MatrixRoomSchema(Schema): + room_id = fields.String( + required=True, + metadata={ + 'description': 'Room ID', + 'example': '!aBcDeFgHiJkMnO:matrix.example.org', + }, + ) + + name = fields.String( + metadata={ + 'description': 'Room name', + 'example': 'My Room', + } + ) + + display_name = fields.String( + metadata={ + 'description': 'Room display name', + 'example': 'My Room', + } + ) + + topic = fields.String( + metadata={ + 'description': 'Room topic', + 'example': 'My Room Topic', + } + ) + + avatar_url = fields.URL( + attribute='room_avatar_url', + metadata={ + 'description': 'Room avatar URL', + 'example': 'mxc://matrix.platypush.tech/AbCdEfG0123456789', + }, + ) + + owner_id = fields.String( + attribute='own_user_id', + metadata={ + 'description': 'Owner user ID', + 'example': '@myuser:matrix.example.org', + }, + ) + + encrypted = fields.Bool() + + +class MatrixDeviceSchema(Schema): + device_id = fields.String( + required=True, + attribute='id', + metadata={ + 'description': 'ABCDEFG', + }, + ) + + display_name = fields.String( + metadata={ + 'description': 'Device display name', + 'example': 'My Device', + } + ) + + last_seen_ip = fields.String( + metadata={ + 'description': 'Last IP associated to this device', + 'example': '1.2.3.4', + } + ) + + last_seen_date = DateTime( + metadata={ + 'description': 'The last time that the device was reported online', + 'example': '2022-07-23T17:20:01.254223', + } + ) diff --git a/setup.py b/setup.py index a5e379d7..5961bc60 100755 --- a/setup.py +++ b/setup.py @@ -268,5 +268,7 @@ setup( 'ngrok': ['pyngrok'], # Support for IRC integration 'irc': ['irc'], + # Support for the Matrix integration + 'matrix': ['matrix-nio'], }, )