Implemented proper support for encrypted media and added download method

This commit is contained in:
Fabio Manganiello 2022-08-26 23:48:29 +02:00
parent e4eb4cd7dc
commit 48ec6ef68b
Signed by: blacklight
GPG key ID: D90FBA7F76362774
4 changed files with 330 additions and 67 deletions

View file

@ -66,28 +66,58 @@ class MatrixMessageEvent(MatrixEvent):
Event triggered when a message is received on a subscribed room.
"""
def __init__(self, *args, body: str, **kwargs):
def __init__(
self,
*args,
body: str = '',
url: str | None = None,
thumbnail_url: str | None = None,
mimetype: str | None = None,
formatted_body: str | None = None,
format: str | None = None,
**kwargs
):
"""
:param body: The body of the message.
:param url: The URL of the media file, if the message includes media.
:param thumbnail_url: The URL of the thumbnail, if the message includes media.
:param mimetype: The MIME type of the media file, if the message includes media.
:param formatted_body: The formatted body, if ``format`` is specified.
:param format: The format of the message (e.g. ``html`` or ``markdown``).
"""
super().__init__(*args, body=body, **kwargs)
super().__init__(
*args,
body=body,
url=url,
thumbnail_url=thumbnail_url,
mimetype=mimetype,
formatted_body=formatted_body,
format=format,
**kwargs
)
class MatrixMediaMessageEvent(MatrixMessageEvent):
class MatrixMessageImageEvent(MatrixEvent):
"""
Event triggered when a media message is received on a subscribed room.
Event triggered when a message containing an image is received.
"""
def __init__(self, *args, url: str, **kwargs):
"""
:param url: The URL of the media file.
"""
super().__init__(*args, url=url, **kwargs)
class MatrixStickerEvent(MatrixMediaMessageEvent):
class MatrixMessageFileEvent(MatrixEvent):
"""
Event triggered when a sticker is sent to a room.
Event triggered when a message containing a generic file is received.
"""
class MatrixMessageAudioEvent(MatrixEvent):
"""
Event triggered when a message containing an audio file is received.
"""
class MatrixMessageVideoEvent(MatrixEvent):
"""
Event triggered when a message containing a video file is received.
"""

View file

@ -5,21 +5,23 @@ import logging
import os
import pathlib
import re
import threading
from dataclasses import dataclass
from typing import Collection, Coroutine, Dict
from urllib.parse import urlparse
from async_lru import alru_cache
from nio import (
Api,
AsyncClient,
AsyncClientConfig,
CallAnswerEvent,
CallHangupEvent,
CallInviteEvent,
DevicesError,
ErrorResponse,
Event,
InviteNameEvent,
JoinedRoomsError,
KeyVerificationStart,
KeyVerificationAccept,
KeyVerificationMac,
@ -31,12 +33,21 @@ from nio import (
MegolmEvent,
ProfileGetResponse,
RoomCreateEvent,
RoomEncryptedAudio,
RoomEncryptedFile,
RoomEncryptedImage,
RoomEncryptedMedia,
RoomEncryptedVideo,
RoomGetEventError,
RoomGetStateError,
RoomGetStateResponse,
RoomMemberEvent,
RoomMessageAudio,
RoomMessageFile,
RoomMessageFormatted,
RoomMessageText,
RoomMessageImage,
RoomMessageMedia,
RoomMessageVideo,
RoomTopicEvent,
RoomUpgradeEvent,
StickerEvent,
@ -46,8 +57,10 @@ from nio import (
)
from nio.client.async_client import client_session
from nio.crypto import decrypt_attachment
from nio.crypto.device import OlmDevice
from nio.exceptions import OlmUnverifiedDeviceError
from nio.responses import DownloadResponse
from platypush.config import Config
from platypush.context import get_bus
@ -56,21 +69,24 @@ from platypush.message.event.matrix import (
MatrixCallHangupEvent,
MatrixCallInviteEvent,
MatrixEncryptedMessageEvent,
MatrixMediaMessageEvent,
MatrixMessageAudioEvent,
MatrixMessageEvent,
MatrixMessageFileEvent,
MatrixMessageImageEvent,
MatrixMessageVideoEvent,
MatrixReactionEvent,
MatrixRoomCreatedEvent,
MatrixRoomInviteEvent,
MatrixRoomJoinEvent,
MatrixRoomLeaveEvent,
MatrixRoomTopicChangedEvent,
MatrixStickerEvent,
MatrixSyncEvent,
)
from platypush.plugins import AsyncRunnablePlugin, action
from platypush.schemas.matrix import (
MatrixDeviceSchema,
MatrixDownloadedFileSchema,
MatrixEventIdSchema,
MatrixMyDeviceSchema,
MatrixProfileSchema,
@ -114,9 +130,11 @@ class MatrixClient(AsyncClient):
if not store_path:
store_path = os.path.join(Config.get('workdir'), 'matrix', 'store') # type: ignore
if store_path:
store_path = os.path.abspath(os.path.expanduser(store_path))
pathlib.Path(store_path).mkdir(exist_ok=True, parents=True)
assert store_path
store_path = os.path.abspath(os.path.expanduser(store_path))
pathlib.Path(store_path).mkdir(exist_ok=True, parents=True)
if not config:
config = AsyncClientConfig(
max_limit_exceeded=0,
@ -135,6 +153,27 @@ class MatrixClient(AsyncClient):
self._autotrust_users_whitelist = autotrust_users_whitelist or set()
self._first_sync_performed = asyncio.Event()
self._encrypted_attachments_keystore_path = os.path.join(
store_path, 'attachment_keys.json'
)
self._encrypted_attachments_keystore = {}
self._sync_store_timer: threading.Timer | None = None
keystore = {}
try:
with open(self._encrypted_attachments_keystore_path, 'r') as f:
keystore = json.load(f)
except (ValueError, OSError):
with open(self._encrypted_attachments_keystore_path, 'w') as f:
f.write(json.dumps({}))
pathlib.Path(self._encrypted_attachments_keystore_path).touch(
mode=0o600, exist_ok=True
)
self._encrypted_attachments_keystore = {
tuple(key.split('|')): data for key, data in keystore.items()
}
async def _autojoin_room_callback(self, room: MatrixRoom, *_):
await self.join(room.room_id) # type: ignore
@ -217,12 +256,12 @@ class MatrixClient(AsyncClient):
self.logger.info('Synchronizing state')
self._first_sync_performed.clear()
self._add_callbacks()
sync_token = self.loaded_sync_token
self.loaded_sync_token = ''
await self.sync(sync_filter={'room': {'timeline': {'limit': 1}}})
self.loaded_sync_token = sync_token
self._add_callbacks()
self._sync_devices_trust()
self._first_sync_performed.set()
@ -316,14 +355,15 @@ class MatrixClient(AsyncClient):
def _add_callbacks(self):
self.add_event_callback(self._event_catch_all, Event)
self.add_event_callback(self._on_invite, InviteNameEvent) # type: ignore
self.add_event_callback(self._on_room_message, RoomMessageText) # type: ignore
self.add_event_callback(self._on_media_message, RoomMessageMedia) # type: ignore
self.add_event_callback(self._on_message, RoomMessageText) # type: ignore
self.add_event_callback(self._on_message, RoomMessageMedia) # type: ignore
self.add_event_callback(self._on_message, RoomEncryptedMedia) # type: ignore
self.add_event_callback(self._on_message, StickerEvent) # type: ignore
self.add_event_callback(self._on_room_member, RoomMemberEvent) # type: ignore
self.add_event_callback(self._on_room_topic_changed, RoomTopicEvent) # type: ignore
self.add_event_callback(self._on_call_invite, CallInviteEvent) # type: ignore
self.add_event_callback(self._on_call_answer, CallAnswerEvent) # type: ignore
self.add_event_callback(self._on_call_hangup, CallHangupEvent) # type: ignore
self.add_event_callback(self._on_sticker_message, StickerEvent) # type: ignore
self.add_event_callback(self._on_unknown_event, UnknownEvent) # type: ignore
self.add_event_callback(self._on_unknown_encrypted_event, UnknownEncryptedEvent) # type: ignore
self.add_event_callback(self._on_unknown_encrypted_event, MegolmEvent) # type: ignore
@ -336,6 +376,24 @@ class MatrixClient(AsyncClient):
if self._autojoin_on_invite:
self.add_event_callback(self._autojoin_room_callback, InviteNameEvent) # type: ignore
def _sync_store(self):
self.logger.info('Synchronizing keystore')
serialized_keystore = json.dumps(
{
f'{server}|{media_id}': data
for (
server,
media_id,
), data in self._encrypted_attachments_keystore.items()
}
)
try:
with open(self._encrypted_attachments_keystore_path, 'w') as f:
f.write(serialized_keystore)
finally:
self._sync_store_timer = None
@alru_cache(maxsize=500)
@client_session
async def get_profile(self, user_id: str | None = None) -> ProfileGetResponse:
@ -360,6 +418,39 @@ class MatrixClient(AsyncClient):
), f'Could not retrieve profile for room {room_id}: {ret.message}'
return ret
async def download(
self,
server_name: str,
media_id: str,
filename: str | None = None,
allow_remote: bool = True,
):
response = await super().download(
server_name, media_id, filename, allow_remote=allow_remote
)
assert isinstance(
response, DownloadResponse
), f'Could not download media {media_id}: {response}'
encryption_data = self._encrypted_attachments_keystore.get(
(server_name, media_id)
)
if encryption_data:
self.logger.info('Decrypting media %s using the available keys', media_id)
response.filename = encryption_data.get('body', response.filename)
response.content_type = encryption_data.get(
'mimetype', response.content_type
)
response.body = decrypt_attachment(
response.body,
key=encryption_data.get('key'),
hash=encryption_data.get('hash'),
iv=encryption_data.get('iv'),
)
return response
async def _event_base_args(
self, room: MatrixRoom, event: Event | None = None
) -> dict:
@ -393,14 +484,60 @@ class MatrixClient(AsyncClient):
)
)
async def _on_room_message(self, room: MatrixRoom, event: RoomMessageText):
async def _on_message(
self,
room: MatrixRoom,
event: RoomMessageText | RoomMessageMedia | RoomEncryptedMedia | StickerEvent,
):
if self._first_sync_performed.is_set():
get_bus().post(
MatrixMessageEvent(
**(await self._event_base_args(room, event)),
body=event.body,
)
)
evt_type = MatrixMessageEvent
evt_args = {
'body': event.body,
'url': getattr(event, 'url', None),
**(await self._event_base_args(room, event)),
}
if isinstance(event, (RoomMessageMedia, RoomEncryptedMedia, StickerEvent)):
evt_args['url'] = event.url
if isinstance(event, RoomEncryptedMedia):
evt_args['thumbnail_url'] = event.thumbnail_url
evt_args['mimetype'] = event.mimetype
self._store_encrypted_media_keys(event)
if isinstance(event, RoomMessageFormatted):
evt_args['format'] = event.format
evt_args['formatted_body'] = event.formatted_body
if isinstance(event, (RoomMessageImage, RoomEncryptedImage)):
evt_type = MatrixMessageImageEvent
elif isinstance(event, (RoomMessageAudio, RoomEncryptedAudio)):
evt_type = MatrixMessageAudioEvent
elif isinstance(event, (RoomMessageVideo, RoomEncryptedVideo)):
evt_type = MatrixMessageVideoEvent
elif isinstance(event, (RoomMessageFile, RoomEncryptedFile)):
evt_type = MatrixMessageFileEvent
get_bus().post(evt_type(**evt_args))
def _store_encrypted_media_keys(self, event: RoomEncryptedMedia):
url = event.url.strip('/')
parsed_url = urlparse(url)
homeserver = parsed_url.netloc.strip('/')
media_key = (homeserver, parsed_url.path.strip('/'))
self._encrypted_attachments_keystore[media_key] = {
'url': url,
'body': event.body,
'key': event.key['k'],
'hash': event.hashes['sha256'],
'iv': event.iv,
'homeserver': homeserver,
'mimetype': event.mimetype,
}
if not self._sync_store_timer:
self._sync_store_timer = threading.Timer(5, self._sync_store)
self._sync_store_timer.start()
async def _on_room_member(self, room: MatrixRoom, event: RoomMemberEvent):
evt_type = None
@ -465,24 +602,6 @@ class MatrixClient(AsyncClient):
)
)
async def _on_media_message(self, room: MatrixRoom, event: RoomMessageMedia):
if self._first_sync_performed.is_set():
get_bus().post(
MatrixMediaMessageEvent(
url=event.url,
**(await self._event_base_args(room, event)),
)
)
async def _on_sticker_message(self, room: MatrixRoom, event: StickerEvent):
if self._first_sync_performed.is_set():
get_bus().post(
MatrixStickerEvent(
url=event.url,
**(await self._event_base_args(room, event)),
)
)
def _get_sas(self, event):
sas = self.key_verifications.get(event.transaction_id)
if not sas:
@ -650,12 +769,23 @@ class MatrixPlugin(AsyncRunnablePlugin):
the same list. Then confirm that you see the same emojis, and your
device will be automatically marked as trusted.
All the URLs returned by actions and events on this plugin are in the
``mxc://<server>/<media_id>`` format. You can either convert them to HTTP
through the :meth:`.mxc_to_http` method, or download them through the
:meth:`.download` method.
Triggers:
* :class:`platypush.message.event.matrix.MatrixMessageEvent`: when a
message is received.
* :class:`platypush.message.event.matrix.MatrixMediaMessageEvent`: when
a media message is received.
* :class:`platypush.message.event.matrix.MatrixMessageImageEvent`: when a
message containing an image is received.
* :class:`platypush.message.event.matrix.MatrixMessageAudioEvent`: when a
message containing an audio file is received.
* :class:`platypush.message.event.matrix.MatrixMessageVideoEvent`: when a
message containing a video file is received.
* :class:`platypush.message.event.matrix.MatrixMessageFileEvent`: when a
message containing a generic file is received.
* :class:`platypush.message.event.matrix.MatrixSyncEvent`: when the
startup synchronization has been completed and the plugin is ready to
use.
@ -675,8 +805,6 @@ class MatrixPlugin(AsyncRunnablePlugin):
called user wishes to pick the call.
* :class:`platypush.message.event.matrix.MatrixCallHangupEvent`: when a
called user exits the call.
* :class:`platypush.message.event.matrix.MatrixStickerEvent`: when a
sticker is sent to a room.
* :class:`platypush.message.event.matrix.MatrixEncryptedMessageEvent`:
when a message is received but the client doesn't have the E2E keys
to decrypt it, or encryption has not been enabled.
@ -691,6 +819,7 @@ class MatrixPlugin(AsyncRunnablePlugin):
access_token: str | None = None,
device_name: str | None = 'platypush',
device_id: str | None = None,
download_path: str | None = None,
autojoin_on_invite: bool = True,
autotrust_devices: bool = False,
autotrust_devices_whitelist: Collection[str] | None = None,
@ -717,6 +846,8 @@ class MatrixPlugin(AsyncRunnablePlugin):
:param access_token: User access token.
:param device_name: The name of this device/connection (default: ``platypush``).
:param device_id: Use an existing ``device_id`` for the sessions.
:param download_path: The folder where downloaded media will be saved
(default: ``~/Downloads``).
:param autojoin_on_invite: Whether the account should automatically
join rooms upon invite. If false, then you may want to implement your
own logic in an event hook when a
@ -750,6 +881,10 @@ class MatrixPlugin(AsyncRunnablePlugin):
self._access_token = access_token
self._device_name = device_name
self._device_id = device_id
self._download_path = download_path or os.path.join(
os.path.expanduser('~'), 'Downloads'
)
self._autojoin_on_invite = autojoin_on_invite
self._autotrust_devices = autotrust_devices
self._autotrust_devices_whitelist = set(autotrust_devices_whitelist or [])
@ -797,6 +932,8 @@ class MatrixPlugin(AsyncRunnablePlugin):
await self.client.sync_forever(timeout=30000, full_state=True)
except KeyboardInterrupt:
pass
except Exception as e:
self.logger.exception(e)
finally:
try:
await self.client.close()
@ -811,6 +948,7 @@ class MatrixPlugin(AsyncRunnablePlugin):
except OlmUnverifiedDeviceError as e:
raise AssertionError(str(e))
assert not isinstance(ret, ErrorResponse), ret.message
if hasattr(ret, 'transport_response'):
response = ret.transport_response
assert response.ok, f'{coro} failed with status {response.status}'
@ -853,11 +991,7 @@ class MatrixPlugin(AsyncRunnablePlugin):
)
)
assert self._loop
ret = asyncio.run_coroutine_threadsafe(
ret.transport_response.json(), self._loop
).result()
ret = self._loop_execute(ret.transport_response.json())
return MatrixEventIdSchema().dump(ret)
@action
@ -881,8 +1015,6 @@ class MatrixPlugin(AsyncRunnablePlugin):
:return: .. schema:: matrix.MatrixRoomSchema
"""
response = self._loop_execute(self.client.room_get_state(room_id)) # type: ignore
assert not isinstance(response, RoomGetStateError), response.message
room_args = {'room_id': room_id, 'own_user_id': None, 'encrypted': False}
room_params = {}
@ -909,7 +1041,6 @@ class MatrixPlugin(AsyncRunnablePlugin):
:return: .. schema:: matrix.MatrixMyDeviceSchema(many=True)
"""
response = self._loop_execute(self.client.devices())
assert not isinstance(response, DevicesError), response.message
return MatrixMyDeviceSchema().dump(response.devices, many=True)
@action
@ -927,7 +1058,6 @@ class MatrixPlugin(AsyncRunnablePlugin):
Retrieve the rooms that the user has joined.
"""
response = self._loop_execute(self.client.joined_rooms())
assert not isinstance(response, JoinedRoomsError), response.message
return [self.get_room(room_id).output for room_id in response.rooms] # type: ignore
@action
@ -937,7 +1067,7 @@ class MatrixPlugin(AsyncRunnablePlugin):
"""
self._loop_execute(self.client.keys_upload())
def _get_device(self, device_id: str):
def _get_device(self, device_id: str) -> OlmDevice:
device = self.client.get_device(device_id)
assert device, f'No such device_id: {device_id}'
return device
@ -962,5 +1092,72 @@ class MatrixPlugin(AsyncRunnablePlugin):
device = self._get_device(device_id)
self.client.unverify_device(device)
@action
def mxc_to_http(self, url: str, homeserver: str | None = None) -> str:
"""
Convert a Matrix URL (in the format ``mxc://server/media_id``) to an
HTTP URL.
Note that invoking this function on a URL containing encrypted content
(i.e. a URL containing media sent to an encrypted room) will provide a
URL that points to encrypted content. The best way to deal with
encrypted media is by using :meth:`.download` to download the media
locally.
:param url: The MXC URL to be converted.
:param homeserver: The hosting homeserver (default: the same as the URL).
:return: The converted HTTP(s) URL.
"""
http_url = Api.mxc_to_http(url, homeserver=homeserver)
assert http_url, f'Could not convert URL {url}'
return http_url
@action
def download(
self,
url: str,
download_path: str | None = None,
filename: str | None = None,
allow_remote=True,
):
"""
Download a file given its Matrix URL.
Note that URLs that point to encrypted resources will be automatically
decrypted only if they were received on a room joined by this account.
:param url: Matrix URL, in the format
:return: .. schema:: matrix.MatrixDownloadedFileSchema
"""
parsed_url = urlparse(url)
server = parsed_url.netloc.strip('/')
media_id = parsed_url.path.strip('/')
response = self._loop_execute(
self.client.download(
server, media_id, filename=filename, allow_remote=allow_remote
)
)
if not download_path:
download_path = self._download_path
if not filename:
filename = response.filename or media_id
outfile = os.path.join(str(download_path), str(filename))
pathlib.Path(download_path).mkdir(parents=True, exist_ok=True)
with open(outfile, 'wb') as f:
f.write(response.body)
return MatrixDownloadedFileSchema().dump(
{
'url': url,
'path': outfile,
'size': len(response.body),
'content_type': response.content_type,
}
)
# vim:sw=4:ts=4:et:

View file

@ -1,7 +1,14 @@
manifest:
events:
platypush.message.event.matrix.MatrixMessageEvent: when a message is received.
platypush.message.event.matrix.MatrixMediaMessageEvent: when a media message is received.
platypush.message.event.matrix.MatrixMessageImageEvent: when a message
containing an image is received.
platypush.message.event.matrix.MatrixMessageAudioEvent: when a message
containing an audio file is received.
platypush.message.event.matrix.MatrixMessageVideoEvent: when a message
containing a video file is received.
platypush.message.event.matrix.MatrixMessageFileEvent: when a message
containing a generic file is received.
platypush.message.event.matrix.MatrixSyncEvent: when the startup
synchronization has been completed and the plugin is ready to use.
platypush.message.event.matrix.MatrixRoomCreatedEvent: when a room is created.
@ -12,7 +19,6 @@ manifest:
platypush.message.event.matrix.MatrixCallInviteEvent: when the user is invited to a call.
platypush.message.event.matrix.MatrixCallAnswerEvent: when a called user wishes to pick the call.
platypush.message.event.matrix.MatrixCallHangupEvent: when a called user exits the call.
platypush.message.event.matrix.MatrixStickerEvent: when a sticker is sent to a room.
platypush.message.event.matrix.MatrixEncryptedMessageEvent: |
when a message is received but the client doesn't
have the E2E keys to decrypt it, or encryption has not been enabled.

View file

@ -34,7 +34,7 @@ class MatrixProfileSchema(Schema):
avatar_url = fields.URL(
metadata={
'description': 'User avatar URL',
'example': 'mxc://matrix.platypush.tech/AbCdEfG0123456789',
'example': 'mxc://matrix.example.org/AbCdEfG0123456789',
}
)
@ -73,7 +73,7 @@ class MatrixRoomSchema(Schema):
attribute='room_avatar_url',
metadata={
'description': 'Room avatar URL',
'example': 'mxc://matrix.platypush.tech/AbCdEfG0123456789',
'example': 'mxc://matrix.example.org/AbCdEfG0123456789',
},
)
@ -157,3 +157,33 @@ class MatrixMyDeviceSchema(Schema):
'example': '2022-07-23T17:20:01.254223',
}
)
class MatrixDownloadedFileSchema(Schema):
url = fields.String(
metadata={
'description': 'Matrix URL of the original resource',
'example': 'mxc://matrix.example.org/YhQycHvFOvtiDDbEeWWtEhXx',
},
)
path = fields.String(
metadata={
'description': 'Local path where the file has been saved',
'example': '/home/user/Downloads/image.png',
}
)
content_type = fields.String(
metadata={
'description': 'Content type of the downloaded file',
'example': 'image/png',
}
)
size = fields.Int(
metadata={
'description': 'Length in bytes of the output file',
'example': 1024,
}
)