From 48ec6ef68b1df988c69d6d81ba2aa35dde7d227b Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 26 Aug 2022 23:48:29 +0200 Subject: [PATCH] Implemented proper support for encrypted media and added download method --- platypush/message/event/matrix.py | 54 ++++- platypush/plugins/matrix/__init__.py | 299 ++++++++++++++++++++----- platypush/plugins/matrix/manifest.yaml | 10 +- platypush/schemas/matrix.py | 34 ++- 4 files changed, 330 insertions(+), 67 deletions(-) diff --git a/platypush/message/event/matrix.py b/platypush/message/event/matrix.py index f6e7a3ac..71032e39 100644 --- a/platypush/message/event/matrix.py +++ b/platypush/message/event/matrix.py @@ -66,28 +66,58 @@ class MatrixMessageEvent(MatrixEvent): Event triggered when a message is received on a subscribed room. """ - def __init__(self, *args, body: str, **kwargs): + def __init__( + self, + *args, + body: str = '', + url: str | None = None, + thumbnail_url: str | None = None, + mimetype: str | None = None, + formatted_body: str | None = None, + format: str | None = None, + **kwargs + ): """ :param body: The body of the message. + :param url: The URL of the media file, if the message includes media. + :param thumbnail_url: The URL of the thumbnail, if the message includes media. + :param mimetype: The MIME type of the media file, if the message includes media. + :param formatted_body: The formatted body, if ``format`` is specified. + :param format: The format of the message (e.g. ``html`` or ``markdown``). """ - super().__init__(*args, body=body, **kwargs) + super().__init__( + *args, + body=body, + url=url, + thumbnail_url=thumbnail_url, + mimetype=mimetype, + formatted_body=formatted_body, + format=format, + **kwargs + ) -class MatrixMediaMessageEvent(MatrixMessageEvent): +class MatrixMessageImageEvent(MatrixEvent): """ - Event triggered when a media message is received on a subscribed room. + Event triggered when a message containing an image is received. """ - def __init__(self, *args, url: str, **kwargs): - """ - :param url: The URL of the media file. - """ - super().__init__(*args, url=url, **kwargs) - -class MatrixStickerEvent(MatrixMediaMessageEvent): +class MatrixMessageFileEvent(MatrixEvent): """ - Event triggered when a sticker is sent to a room. + Event triggered when a message containing a generic file is received. + """ + + +class MatrixMessageAudioEvent(MatrixEvent): + """ + Event triggered when a message containing an audio file is received. + """ + + +class MatrixMessageVideoEvent(MatrixEvent): + """ + Event triggered when a message containing a video file is received. """ diff --git a/platypush/plugins/matrix/__init__.py b/platypush/plugins/matrix/__init__.py index a673ae20..2df1eba0 100644 --- a/platypush/plugins/matrix/__init__.py +++ b/platypush/plugins/matrix/__init__.py @@ -5,21 +5,23 @@ import logging import os import pathlib import re +import threading from dataclasses import dataclass from typing import Collection, Coroutine, Dict +from urllib.parse import urlparse from async_lru import alru_cache from nio import ( + Api, AsyncClient, AsyncClientConfig, CallAnswerEvent, CallHangupEvent, CallInviteEvent, - DevicesError, + ErrorResponse, Event, InviteNameEvent, - JoinedRoomsError, KeyVerificationStart, KeyVerificationAccept, KeyVerificationMac, @@ -31,12 +33,21 @@ from nio import ( MegolmEvent, ProfileGetResponse, RoomCreateEvent, + RoomEncryptedAudio, + RoomEncryptedFile, + RoomEncryptedImage, + RoomEncryptedMedia, + RoomEncryptedVideo, RoomGetEventError, - RoomGetStateError, RoomGetStateResponse, RoomMemberEvent, + RoomMessageAudio, + RoomMessageFile, + RoomMessageFormatted, RoomMessageText, + RoomMessageImage, RoomMessageMedia, + RoomMessageVideo, RoomTopicEvent, RoomUpgradeEvent, StickerEvent, @@ -46,8 +57,10 @@ from nio import ( ) from nio.client.async_client import client_session +from nio.crypto import decrypt_attachment from nio.crypto.device import OlmDevice from nio.exceptions import OlmUnverifiedDeviceError +from nio.responses import DownloadResponse from platypush.config import Config from platypush.context import get_bus @@ -56,21 +69,24 @@ from platypush.message.event.matrix import ( MatrixCallHangupEvent, MatrixCallInviteEvent, MatrixEncryptedMessageEvent, - MatrixMediaMessageEvent, + MatrixMessageAudioEvent, MatrixMessageEvent, + MatrixMessageFileEvent, + MatrixMessageImageEvent, + MatrixMessageVideoEvent, MatrixReactionEvent, MatrixRoomCreatedEvent, MatrixRoomInviteEvent, MatrixRoomJoinEvent, MatrixRoomLeaveEvent, MatrixRoomTopicChangedEvent, - MatrixStickerEvent, MatrixSyncEvent, ) from platypush.plugins import AsyncRunnablePlugin, action from platypush.schemas.matrix import ( MatrixDeviceSchema, + MatrixDownloadedFileSchema, MatrixEventIdSchema, MatrixMyDeviceSchema, MatrixProfileSchema, @@ -114,9 +130,11 @@ class MatrixClient(AsyncClient): if not store_path: store_path = os.path.join(Config.get('workdir'), 'matrix', 'store') # type: ignore - if store_path: - store_path = os.path.abspath(os.path.expanduser(store_path)) - pathlib.Path(store_path).mkdir(exist_ok=True, parents=True) + + assert store_path + store_path = os.path.abspath(os.path.expanduser(store_path)) + pathlib.Path(store_path).mkdir(exist_ok=True, parents=True) + if not config: config = AsyncClientConfig( max_limit_exceeded=0, @@ -135,6 +153,27 @@ class MatrixClient(AsyncClient): self._autotrust_users_whitelist = autotrust_users_whitelist or set() self._first_sync_performed = asyncio.Event() + self._encrypted_attachments_keystore_path = os.path.join( + store_path, 'attachment_keys.json' + ) + self._encrypted_attachments_keystore = {} + self._sync_store_timer: threading.Timer | None = None + keystore = {} + + try: + with open(self._encrypted_attachments_keystore_path, 'r') as f: + keystore = json.load(f) + except (ValueError, OSError): + with open(self._encrypted_attachments_keystore_path, 'w') as f: + f.write(json.dumps({})) + + pathlib.Path(self._encrypted_attachments_keystore_path).touch( + mode=0o600, exist_ok=True + ) + self._encrypted_attachments_keystore = { + tuple(key.split('|')): data for key, data in keystore.items() + } + async def _autojoin_room_callback(self, room: MatrixRoom, *_): await self.join(room.room_id) # type: ignore @@ -217,12 +256,12 @@ class MatrixClient(AsyncClient): self.logger.info('Synchronizing state') self._first_sync_performed.clear() + self._add_callbacks() sync_token = self.loaded_sync_token self.loaded_sync_token = '' await self.sync(sync_filter={'room': {'timeline': {'limit': 1}}}) self.loaded_sync_token = sync_token - self._add_callbacks() self._sync_devices_trust() self._first_sync_performed.set() @@ -316,14 +355,15 @@ class MatrixClient(AsyncClient): def _add_callbacks(self): self.add_event_callback(self._event_catch_all, Event) self.add_event_callback(self._on_invite, InviteNameEvent) # type: ignore - self.add_event_callback(self._on_room_message, RoomMessageText) # type: ignore - self.add_event_callback(self._on_media_message, RoomMessageMedia) # type: ignore + self.add_event_callback(self._on_message, RoomMessageText) # type: ignore + self.add_event_callback(self._on_message, RoomMessageMedia) # type: ignore + self.add_event_callback(self._on_message, RoomEncryptedMedia) # type: ignore + self.add_event_callback(self._on_message, StickerEvent) # type: ignore self.add_event_callback(self._on_room_member, RoomMemberEvent) # type: ignore self.add_event_callback(self._on_room_topic_changed, RoomTopicEvent) # type: ignore self.add_event_callback(self._on_call_invite, CallInviteEvent) # type: ignore self.add_event_callback(self._on_call_answer, CallAnswerEvent) # type: ignore self.add_event_callback(self._on_call_hangup, CallHangupEvent) # type: ignore - self.add_event_callback(self._on_sticker_message, StickerEvent) # type: ignore self.add_event_callback(self._on_unknown_event, UnknownEvent) # type: ignore self.add_event_callback(self._on_unknown_encrypted_event, UnknownEncryptedEvent) # type: ignore self.add_event_callback(self._on_unknown_encrypted_event, MegolmEvent) # type: ignore @@ -336,6 +376,24 @@ class MatrixClient(AsyncClient): if self._autojoin_on_invite: self.add_event_callback(self._autojoin_room_callback, InviteNameEvent) # type: ignore + def _sync_store(self): + self.logger.info('Synchronizing keystore') + serialized_keystore = json.dumps( + { + f'{server}|{media_id}': data + for ( + server, + media_id, + ), data in self._encrypted_attachments_keystore.items() + } + ) + + try: + with open(self._encrypted_attachments_keystore_path, 'w') as f: + f.write(serialized_keystore) + finally: + self._sync_store_timer = None + @alru_cache(maxsize=500) @client_session async def get_profile(self, user_id: str | None = None) -> ProfileGetResponse: @@ -360,6 +418,39 @@ class MatrixClient(AsyncClient): ), f'Could not retrieve profile for room {room_id}: {ret.message}' return ret + async def download( + self, + server_name: str, + media_id: str, + filename: str | None = None, + allow_remote: bool = True, + ): + response = await super().download( + server_name, media_id, filename, allow_remote=allow_remote + ) + + assert isinstance( + response, DownloadResponse + ), f'Could not download media {media_id}: {response}' + + encryption_data = self._encrypted_attachments_keystore.get( + (server_name, media_id) + ) + if encryption_data: + self.logger.info('Decrypting media %s using the available keys', media_id) + response.filename = encryption_data.get('body', response.filename) + response.content_type = encryption_data.get( + 'mimetype', response.content_type + ) + response.body = decrypt_attachment( + response.body, + key=encryption_data.get('key'), + hash=encryption_data.get('hash'), + iv=encryption_data.get('iv'), + ) + + return response + async def _event_base_args( self, room: MatrixRoom, event: Event | None = None ) -> dict: @@ -393,14 +484,60 @@ class MatrixClient(AsyncClient): ) ) - async def _on_room_message(self, room: MatrixRoom, event: RoomMessageText): + async def _on_message( + self, + room: MatrixRoom, + event: RoomMessageText | RoomMessageMedia | RoomEncryptedMedia | StickerEvent, + ): if self._first_sync_performed.is_set(): - get_bus().post( - MatrixMessageEvent( - **(await self._event_base_args(room, event)), - body=event.body, - ) - ) + evt_type = MatrixMessageEvent + evt_args = { + 'body': event.body, + 'url': getattr(event, 'url', None), + **(await self._event_base_args(room, event)), + } + + if isinstance(event, (RoomMessageMedia, RoomEncryptedMedia, StickerEvent)): + evt_args['url'] = event.url + + if isinstance(event, RoomEncryptedMedia): + evt_args['thumbnail_url'] = event.thumbnail_url + evt_args['mimetype'] = event.mimetype + self._store_encrypted_media_keys(event) + if isinstance(event, RoomMessageFormatted): + evt_args['format'] = event.format + evt_args['formatted_body'] = event.formatted_body + + if isinstance(event, (RoomMessageImage, RoomEncryptedImage)): + evt_type = MatrixMessageImageEvent + elif isinstance(event, (RoomMessageAudio, RoomEncryptedAudio)): + evt_type = MatrixMessageAudioEvent + elif isinstance(event, (RoomMessageVideo, RoomEncryptedVideo)): + evt_type = MatrixMessageVideoEvent + elif isinstance(event, (RoomMessageFile, RoomEncryptedFile)): + evt_type = MatrixMessageFileEvent + + get_bus().post(evt_type(**evt_args)) + + def _store_encrypted_media_keys(self, event: RoomEncryptedMedia): + url = event.url.strip('/') + parsed_url = urlparse(url) + homeserver = parsed_url.netloc.strip('/') + media_key = (homeserver, parsed_url.path.strip('/')) + + self._encrypted_attachments_keystore[media_key] = { + 'url': url, + 'body': event.body, + 'key': event.key['k'], + 'hash': event.hashes['sha256'], + 'iv': event.iv, + 'homeserver': homeserver, + 'mimetype': event.mimetype, + } + + if not self._sync_store_timer: + self._sync_store_timer = threading.Timer(5, self._sync_store) + self._sync_store_timer.start() async def _on_room_member(self, room: MatrixRoom, event: RoomMemberEvent): evt_type = None @@ -465,24 +602,6 @@ class MatrixClient(AsyncClient): ) ) - async def _on_media_message(self, room: MatrixRoom, event: RoomMessageMedia): - if self._first_sync_performed.is_set(): - get_bus().post( - MatrixMediaMessageEvent( - url=event.url, - **(await self._event_base_args(room, event)), - ) - ) - - async def _on_sticker_message(self, room: MatrixRoom, event: StickerEvent): - if self._first_sync_performed.is_set(): - get_bus().post( - MatrixStickerEvent( - url=event.url, - **(await self._event_base_args(room, event)), - ) - ) - def _get_sas(self, event): sas = self.key_verifications.get(event.transaction_id) if not sas: @@ -650,12 +769,23 @@ class MatrixPlugin(AsyncRunnablePlugin): the same list. Then confirm that you see the same emojis, and your device will be automatically marked as trusted. + All the URLs returned by actions and events on this plugin are in the + ``mxc:///`` format. You can either convert them to HTTP + through the :meth:`.mxc_to_http` method, or download them through the + :meth:`.download` method. + Triggers: * :class:`platypush.message.event.matrix.MatrixMessageEvent`: when a message is received. - * :class:`platypush.message.event.matrix.MatrixMediaMessageEvent`: when - a media message is received. + * :class:`platypush.message.event.matrix.MatrixMessageImageEvent`: when a + message containing an image is received. + * :class:`platypush.message.event.matrix.MatrixMessageAudioEvent`: when a + message containing an audio file is received. + * :class:`platypush.message.event.matrix.MatrixMessageVideoEvent`: when a + message containing a video file is received. + * :class:`platypush.message.event.matrix.MatrixMessageFileEvent`: when a + message containing a generic file is received. * :class:`platypush.message.event.matrix.MatrixSyncEvent`: when the startup synchronization has been completed and the plugin is ready to use. @@ -675,8 +805,6 @@ class MatrixPlugin(AsyncRunnablePlugin): called user wishes to pick the call. * :class:`platypush.message.event.matrix.MatrixCallHangupEvent`: when a called user exits the call. - * :class:`platypush.message.event.matrix.MatrixStickerEvent`: when a - sticker is sent to a room. * :class:`platypush.message.event.matrix.MatrixEncryptedMessageEvent`: when a message is received but the client doesn't have the E2E keys to decrypt it, or encryption has not been enabled. @@ -691,6 +819,7 @@ class MatrixPlugin(AsyncRunnablePlugin): access_token: str | None = None, device_name: str | None = 'platypush', device_id: str | None = None, + download_path: str | None = None, autojoin_on_invite: bool = True, autotrust_devices: bool = False, autotrust_devices_whitelist: Collection[str] | None = None, @@ -717,6 +846,8 @@ class MatrixPlugin(AsyncRunnablePlugin): :param access_token: User access token. :param device_name: The name of this device/connection (default: ``platypush``). :param device_id: Use an existing ``device_id`` for the sessions. + :param download_path: The folder where downloaded media will be saved + (default: ``~/Downloads``). :param autojoin_on_invite: Whether the account should automatically join rooms upon invite. If false, then you may want to implement your own logic in an event hook when a @@ -750,6 +881,10 @@ class MatrixPlugin(AsyncRunnablePlugin): self._access_token = access_token self._device_name = device_name self._device_id = device_id + self._download_path = download_path or os.path.join( + os.path.expanduser('~'), 'Downloads' + ) + self._autojoin_on_invite = autojoin_on_invite self._autotrust_devices = autotrust_devices self._autotrust_devices_whitelist = set(autotrust_devices_whitelist or []) @@ -797,6 +932,8 @@ class MatrixPlugin(AsyncRunnablePlugin): await self.client.sync_forever(timeout=30000, full_state=True) except KeyboardInterrupt: pass + except Exception as e: + self.logger.exception(e) finally: try: await self.client.close() @@ -811,6 +948,7 @@ class MatrixPlugin(AsyncRunnablePlugin): except OlmUnverifiedDeviceError as e: raise AssertionError(str(e)) + assert not isinstance(ret, ErrorResponse), ret.message if hasattr(ret, 'transport_response'): response = ret.transport_response assert response.ok, f'{coro} failed with status {response.status}' @@ -853,11 +991,7 @@ class MatrixPlugin(AsyncRunnablePlugin): ) ) - assert self._loop - ret = asyncio.run_coroutine_threadsafe( - ret.transport_response.json(), self._loop - ).result() - + ret = self._loop_execute(ret.transport_response.json()) return MatrixEventIdSchema().dump(ret) @action @@ -881,8 +1015,6 @@ class MatrixPlugin(AsyncRunnablePlugin): :return: .. schema:: matrix.MatrixRoomSchema """ response = self._loop_execute(self.client.room_get_state(room_id)) # type: ignore - assert not isinstance(response, RoomGetStateError), response.message - room_args = {'room_id': room_id, 'own_user_id': None, 'encrypted': False} room_params = {} @@ -909,7 +1041,6 @@ class MatrixPlugin(AsyncRunnablePlugin): :return: .. schema:: matrix.MatrixMyDeviceSchema(many=True) """ response = self._loop_execute(self.client.devices()) - assert not isinstance(response, DevicesError), response.message return MatrixMyDeviceSchema().dump(response.devices, many=True) @action @@ -927,7 +1058,6 @@ class MatrixPlugin(AsyncRunnablePlugin): Retrieve the rooms that the user has joined. """ response = self._loop_execute(self.client.joined_rooms()) - assert not isinstance(response, JoinedRoomsError), response.message return [self.get_room(room_id).output for room_id in response.rooms] # type: ignore @action @@ -937,7 +1067,7 @@ class MatrixPlugin(AsyncRunnablePlugin): """ self._loop_execute(self.client.keys_upload()) - def _get_device(self, device_id: str): + def _get_device(self, device_id: str) -> OlmDevice: device = self.client.get_device(device_id) assert device, f'No such device_id: {device_id}' return device @@ -962,5 +1092,72 @@ class MatrixPlugin(AsyncRunnablePlugin): device = self._get_device(device_id) self.client.unverify_device(device) + @action + def mxc_to_http(self, url: str, homeserver: str | None = None) -> str: + """ + Convert a Matrix URL (in the format ``mxc://server/media_id``) to an + HTTP URL. + + Note that invoking this function on a URL containing encrypted content + (i.e. a URL containing media sent to an encrypted room) will provide a + URL that points to encrypted content. The best way to deal with + encrypted media is by using :meth:`.download` to download the media + locally. + + :param url: The MXC URL to be converted. + :param homeserver: The hosting homeserver (default: the same as the URL). + :return: The converted HTTP(s) URL. + """ + http_url = Api.mxc_to_http(url, homeserver=homeserver) + assert http_url, f'Could not convert URL {url}' + return http_url + + @action + def download( + self, + url: str, + download_path: str | None = None, + filename: str | None = None, + allow_remote=True, + ): + """ + Download a file given its Matrix URL. + + Note that URLs that point to encrypted resources will be automatically + decrypted only if they were received on a room joined by this account. + + :param url: Matrix URL, in the format + :return: .. schema:: matrix.MatrixDownloadedFileSchema + """ + parsed_url = urlparse(url) + server = parsed_url.netloc.strip('/') + media_id = parsed_url.path.strip('/') + + response = self._loop_execute( + self.client.download( + server, media_id, filename=filename, allow_remote=allow_remote + ) + ) + + if not download_path: + download_path = self._download_path + if not filename: + filename = response.filename or media_id + + outfile = os.path.join(str(download_path), str(filename)) + pathlib.Path(download_path).mkdir(parents=True, exist_ok=True) + + with open(outfile, 'wb') as f: + f.write(response.body) + + return MatrixDownloadedFileSchema().dump( + { + 'url': url, + 'path': outfile, + 'size': len(response.body), + 'content_type': response.content_type, + } + ) + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/matrix/manifest.yaml b/platypush/plugins/matrix/manifest.yaml index 3f0b5e5b..1b8b2fc9 100644 --- a/platypush/plugins/matrix/manifest.yaml +++ b/platypush/plugins/matrix/manifest.yaml @@ -1,7 +1,14 @@ manifest: events: platypush.message.event.matrix.MatrixMessageEvent: when a message is received. - platypush.message.event.matrix.MatrixMediaMessageEvent: when a media message is received. + platypush.message.event.matrix.MatrixMessageImageEvent: when a message + containing an image is received. + platypush.message.event.matrix.MatrixMessageAudioEvent: when a message + containing an audio file is received. + platypush.message.event.matrix.MatrixMessageVideoEvent: when a message + containing a video file is received. + platypush.message.event.matrix.MatrixMessageFileEvent: when a message + containing a generic file is received. platypush.message.event.matrix.MatrixSyncEvent: when the startup synchronization has been completed and the plugin is ready to use. platypush.message.event.matrix.MatrixRoomCreatedEvent: when a room is created. @@ -12,7 +19,6 @@ manifest: platypush.message.event.matrix.MatrixCallInviteEvent: when the user is invited to a call. platypush.message.event.matrix.MatrixCallAnswerEvent: when a called user wishes to pick the call. platypush.message.event.matrix.MatrixCallHangupEvent: when a called user exits the call. - platypush.message.event.matrix.MatrixStickerEvent: when a sticker is sent to a room. platypush.message.event.matrix.MatrixEncryptedMessageEvent: | when a message is received but the client doesn't have the E2E keys to decrypt it, or encryption has not been enabled. diff --git a/platypush/schemas/matrix.py b/platypush/schemas/matrix.py index b28a6cbf..ae9fe679 100644 --- a/platypush/schemas/matrix.py +++ b/platypush/schemas/matrix.py @@ -34,7 +34,7 @@ class MatrixProfileSchema(Schema): avatar_url = fields.URL( metadata={ 'description': 'User avatar URL', - 'example': 'mxc://matrix.platypush.tech/AbCdEfG0123456789', + 'example': 'mxc://matrix.example.org/AbCdEfG0123456789', } ) @@ -73,7 +73,7 @@ class MatrixRoomSchema(Schema): attribute='room_avatar_url', metadata={ 'description': 'Room avatar URL', - 'example': 'mxc://matrix.platypush.tech/AbCdEfG0123456789', + 'example': 'mxc://matrix.example.org/AbCdEfG0123456789', }, ) @@ -157,3 +157,33 @@ class MatrixMyDeviceSchema(Schema): 'example': '2022-07-23T17:20:01.254223', } ) + + +class MatrixDownloadedFileSchema(Schema): + url = fields.String( + metadata={ + 'description': 'Matrix URL of the original resource', + 'example': 'mxc://matrix.example.org/YhQycHvFOvtiDDbEeWWtEhXx', + }, + ) + + path = fields.String( + metadata={ + 'description': 'Local path where the file has been saved', + 'example': '/home/user/Downloads/image.png', + } + ) + + content_type = fields.String( + metadata={ + 'description': 'Content type of the downloaded file', + 'example': 'image/png', + } + ) + + size = fields.Int( + metadata={ + 'description': 'Length in bytes of the output file', + 'example': 1024, + } + )