diff --git a/platypush/plugins/matrix/__init__.py b/platypush/plugins/matrix/__init__.py index 76dba6336d..ee8a252d1d 100644 --- a/platypush/plugins/matrix/__init__.py +++ b/platypush/plugins/matrix/__init__.py @@ -41,6 +41,7 @@ from nio import ( RoomGetEventError, RoomGetStateResponse, RoomMemberEvent, + RoomMessage, RoomMessageAudio, RoomMessageFile, RoomMessageFormatted, @@ -51,6 +52,7 @@ from nio import ( RoomTopicEvent, RoomUpgradeEvent, StickerEvent, + SyncResponse, ToDeviceError, UnknownEncryptedEvent, UnknownEvent, @@ -58,13 +60,14 @@ from nio import ( import aiofiles import aiofiles.os -from nio.api import RoomVisibility +from nio.api import MessageDirection, RoomVisibility from nio.client.async_client import client_session +from nio.client.base_client import logged_in from nio.crypto import decrypt_attachment from nio.crypto.device import OlmDevice from nio.exceptions import OlmUnverifiedDeviceError -from nio.responses import DownloadResponse +from nio.responses import DownloadResponse, RoomMessagesResponse from platypush.config import Config from platypush.context import get_bus @@ -92,6 +95,8 @@ from platypush.schemas.matrix import ( MatrixDeviceSchema, MatrixDownloadedFileSchema, MatrixEventIdSchema, + MatrixMemberSchema, + MatrixMessagesResponseSchema, MatrixMyDeviceSchema, MatrixProfileSchema, MatrixRoomIdSchema, @@ -159,6 +164,7 @@ class MatrixClient(AsyncClient): self._autotrust_rooms_whitelist = autotrust_rooms_whitelist or set() self._autotrust_users_whitelist = autotrust_users_whitelist or set() self._first_sync_performed = asyncio.Event() + self._last_batches_by_room = {} self._encrypted_attachments_keystore_path = os.path.join( store_path, 'attachment_keys.json' @@ -177,6 +183,7 @@ class MatrixClient(AsyncClient): pathlib.Path(self._encrypted_attachments_keystore_path).touch( mode=0o600, exist_ok=True ) + self._encrypted_attachments_keystore = { tuple(key.split('|')): data for key, data in keystore.items() } @@ -276,6 +283,37 @@ class MatrixClient(AsyncClient): self.logger.info('State synchronized') return login_res + @logged_in + async def sync(self, *args, **kwargs) -> SyncResponse: + response = await super().sync(*args, **kwargs) + assert isinstance(response, SyncResponse), str(response) + self._last_batches_by_room.update( + { + room_id: { + 'prev_batch': room.timeline.prev_batch, + 'next_batch': response.next_batch, + } + for room_id, room in response.rooms.join.items() + } + ) + + return response + + @logged_in + async def room_messages( + self, room_id: str, start: str | None = None, *args, **kwargs + ) -> RoomMessagesResponse: + if not start: + start = self._last_batches_by_room.get(room_id, {}).get('prev_batch') + assert start, ( + f'No sync batches were found for room {room_id} and no start' + 'batch has been provided' + ) + + response = await super().room_messages(room_id, start, *args, **kwargs) + assert isinstance(response, RoomMessagesResponse), str(response) + return response + def _sync_devices_trust(self): all_devices = self.get_devices() devices_to_trust: Dict[str, OlmDevice] = {} @@ -425,6 +463,7 @@ class MatrixClient(AsyncClient): ), f'Could not retrieve profile for room {room_id}: {ret.message}' return ret + @client_session async def download( self, server_name: str, @@ -1125,6 +1164,44 @@ class MatrixPlugin(AsyncRunnablePlugin): setattr(room, k, v) return MatrixRoomSchema().dump(room) + @action + def get_messages( + self, + room_id: str, + start: str | None = None, + end: str | None = None, + backwards: bool = True, + limit: int = 10, + ): + """ + Retrieve a list of messages from a room. + + :param room_id: Room ID. + :param start: Start retrieving messages from this batch ID (default: + latest batch returned from a call to ``sync``). + :param end: Retrieving messages until this batch ID. + :param backwards: Set to True if you want to retrieve messages starting + from the most recent, in descending order (default). Otherwise, the + first returned message will be the oldest and messages will be + returned in ascending order. + :param limit: Maximum number of messages to be returned (default: 10). + # :return: .. schema:: matrix.MatrixMessagesResponseSchema + """ + response = self._loop_execute( + self.client.room_messages( + room_id, + start=start, + end=end, + limit=limit, + direction=( + MessageDirection.back if backwards else MessageDirection.front + ), + ) + ) + + response.chunk = [m for m in response.chunk if isinstance(m, RoomMessage)] + return MatrixMessagesResponseSchema().dump(response) + @action def get_my_devices(self): """ @@ -1144,6 +1221,44 @@ class MatrixPlugin(AsyncRunnablePlugin): """ return MatrixDeviceSchema().dump(self._get_device(device_id)) + @action + def update_device(self, device_id: str, display_name: str | None = None): + """ + Update information about a user's device. + + :param display_name: New display name. + :return: .. schema:: matrix.MatrixDeviceSchema + """ + content = {} + if display_name: + content['display_name'] = display_name + + self._loop_execute(self.client.update_device(device_id, content)) + return MatrixDeviceSchema().dump(self._get_device(device_id)) + + @action + def delete_devices( + self, + devices: Sequence[str], + username: str | None = None, + password: str | None = None, + ): + """ + Delete a list of devices from the user's authorized list and invalidate + their access tokens. + + :param devices: List of devices that should be deleted. + :param username: Username, if the server requires authentication upon + device deletion. + :param password: User password, if the server requires authentication + upon device deletion. + """ + auth = {} + if username and password: + auth = {'type': 'm.login.password', 'user': username, 'password': password} + + self._loop_execute(self.client.delete_devices([*devices], auth=auth)) + @action def get_joined_rooms(self): """ @@ -1154,6 +1269,48 @@ class MatrixPlugin(AsyncRunnablePlugin): response = self._loop_execute(self.client.joined_rooms()) return [self.get_room(room_id).output for room_id in response.rooms] # type: ignore + @action + def get_room_members(self, room_id: str): + """ + Retrieve the list of users joined into a room. + + :param room_id: The room ID. + :return: .. schema:: matrix.MatrixMemberSchema(many=True) + """ + response = self._loop_execute(self.client.joined_members(room_id)) + return MatrixMemberSchema().dump(response.members, many=True) + + @action + def room_alias_to_id(self, alias: str) -> str: + """ + Convert a room alias (in the format ``#alias:matrix.example.org``) to a + room ID (in the format ``!aBcDeFgHiJkMnO:matrix.example.org'). + + :param alias: The room alias. + :return: The room ID, as a string. + """ + response = self._loop_execute(self.client.room_resolve_alias(alias)) + return response.room_id + + @action + def add_room_alias(self, room_id: str, alias: str): + """ + Add an alias to a room. + + :param room_id: An existing room ID. + :param alias: The room alias. + """ + self._loop_execute(self.client.room_put_alias(alias, room_id)) + + @action + def delete_room_alias(self, alias: str): + """ + Delete a room alias. + + :param alias: The room alias. + """ + self._loop_execute(self.client.room_delete_alias(alias)) + @action def upload_keys(self): """ @@ -1342,7 +1499,7 @@ class MatrixPlugin(AsyncRunnablePlugin): return MatrixRoomIdSchema().dump(rs) @action - def invite_to_room(self, room_id: str, user_id: str): + def invite(self, room_id: str, user_id: str): """ Invite a user to a room. @@ -1352,7 +1509,39 @@ class MatrixPlugin(AsyncRunnablePlugin): self._loop_execute(self.client.room_invite(room_id, user_id)) @action - def join_room(self, room_id: str): + def kick(self, room_id: str, user_id: str, reason: str | None = None): + """ + Kick a user out of a room. + + :param room_id: Room ID. + :param user_id: User ID. + :param reason: Optional reason. + """ + self._loop_execute(self.client.room_kick(room_id, user_id, reason)) + + @action + def ban(self, room_id: str, user_id: str, reason: str | None = None): + """ + Ban a user from a room. + + :param room_id: Room ID. + :param user_id: User ID. + :param reason: Optional reason. + """ + self._loop_execute(self.client.room_ban(room_id, user_id, reason)) + + @action + def unban(self, room_id: str, user_id: str): + """ + Remove a user ban from a room. + + :param room_id: Room ID. + :param user_id: User ID. + """ + self._loop_execute(self.client.room_unban(room_id, user_id)) + + @action + def join(self, room_id: str): """ Join a room. @@ -1361,7 +1550,7 @@ class MatrixPlugin(AsyncRunnablePlugin): self._loop_execute(self.client.join(room_id)) @action - def leave_room(self, room_id: str): + def leave(self, room_id: str): """ Leave a joined room. @@ -1369,5 +1558,17 @@ class MatrixPlugin(AsyncRunnablePlugin): """ self._loop_execute(self.client.room_leave(room_id)) + @action + def forget(self, room_id: str): + """ + Leave a joined room and forget its data as well as all the messages. + + If all the users leave a room, that room will be marked for deletion by + the homeserver. + + :param room_id: Room ID. + """ + self._loop_execute(self.client.room_forget(room_id)) + # vim:sw=4:ts=4:et: diff --git a/platypush/schemas/matrix.py b/platypush/schemas/matrix.py index 1aebba52d6..cb9ecc2390 100644 --- a/platypush/schemas/matrix.py +++ b/platypush/schemas/matrix.py @@ -4,6 +4,14 @@ from marshmallow.schema import Schema from platypush.schemas import DateTime +class MillisecondsTimestamp(DateTime): + def _get_attr(self, *args, **kwargs): + value = super()._get_attr(*args, **kwargs) + if isinstance(value, int): + value = float(value / 1000) + return value + + class MatrixEventIdSchema(Schema): event_id = fields.String( required=True, @@ -49,6 +57,12 @@ class MatrixProfileSchema(Schema): ) +class MatrixMemberSchema(MatrixProfileSchema): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.fields['display_name'].attribute = 'display_name' + + class MatrixRoomSchema(Schema): room_id = fields.String( required=True, @@ -197,3 +211,175 @@ class MatrixDownloadedFileSchema(Schema): 'example': 1024, } ) + + +class MatrixMessageSchema(Schema): + event_id = fields.String( + required=True, + metadata={ + 'description': 'Event ID associated to this message', + 'example': '$2eOQ5ueafANj91GnPCRkRUOOjM7dI5kFDOlfMNCD2ly', + }, + ) + + room_id = fields.String( + required=True, + metadata={ + 'description': 'The ID of the room containing the message', + 'example': '!aBcDeFgHiJkMnO:matrix.example.org', + }, + ) + + user_id = fields.String( + required=True, + attribute='sender', + metadata={ + 'description': 'ID of the user who sent the message', + 'example': '@myuser:matrix.example.org', + }, + ) + + body = fields.String( + required=True, + metadata={ + 'description': 'Message body', + 'example': 'Hello world!', + }, + ) + + format = fields.String( + metadata={ + 'description': 'Message format', + 'example': 'markdown', + }, + ) + + formatted_body = fields.String( + metadata={ + 'description': 'Formatted body', + 'example': '**Hello world!**', + }, + ) + + url = fields.String( + metadata={ + 'description': 'mxc:// URL if this message contains an attachment', + 'example': 'mxc://matrix.example.org/oarGdlpvcwppARPjzNlmlXkD', + }, + ) + + content_type = fields.String( + attribute='mimetype', + metadata={ + 'description': 'If the message contains an attachment, this field ' + 'will contain its MIME type', + 'example': 'image/jpeg', + }, + ) + + transaction_id = fields.String( + metadata={ + 'description': 'Set if this message a unique transaction_id associated', + 'example': 'mQ8hZR6Dx8I8YDMwONYmBkf7lTgJSMV/ZPqosDNM', + }, + ) + + decrypted = fields.Bool( + metadata={ + 'description': 'True if the message was encrypted and has been ' + 'successfully decrypted', + }, + ) + + verified = fields.Bool( + metadata={ + 'description': 'True if this is an encrypted message coming from a ' + 'verified source' + }, + ) + + hashes = fields.Dict( + metadata={ + 'description': 'If the message has been decrypted, this field ' + 'contains a mapping of its hashes', + 'example': {'sha256': 'yoQLQwcURq6/bJp1xQ/uhn9Z2xeA27KhMhPd/mfT8tR'}, + }, + ) + + iv = fields.String( + metadata={ + 'description': 'If the message has been decrypted, this field ' + 'contains the encryption initial value', + 'example': 'NqJMMdijlLvAAAAAAAAAAA', + }, + ) + + key = fields.Dict( + metadata={ + 'description': 'If the message has been decrypted, this field ' + 'contains the encryption/decryption key', + 'example': { + 'alg': 'A256CTR', + 'ext': True, + 'k': 'u6jjAyNvJoBHE55P5ZfvX49m3oSt9s_L4PSQdprRSJI', + 'key_ops': ['encrypt', 'decrypt'], + 'kty': 'oct', + }, + }, + ) + + timestamp = MillisecondsTimestamp( + required=True, + attribute='server_timestamp', + metadata={ + 'description': 'When the event was registered on the server', + 'example': '2022-07-23T17:20:01.254223', + }, + ) + + +class MatrixMessagesResponseSchema(Schema): + messages = fields.Nested( + MatrixMessageSchema(), + many=True, + required=True, + attribute='chunk', + ) + + start = fields.String( + required=True, + nullable=True, + metadata={ + 'description': 'Pointer to the first message. It can be used as a ' + '``start``/``end`` for another ``get_messages`` query.', + 'example': 's10226_143893_619_3648_5951_5_555_7501_0', + }, + ) + + end = fields.String( + required=True, + nullable=True, + metadata={ + 'description': 'Pointer to the last message. It can be used as a ' + '``start``/``end`` for another ``get_messages`` query.', + 'example': 't2-10202_143892_626_3663_5949_6_558_7501_0', + }, + ) + + start_time = MillisecondsTimestamp( + required=True, + nullable=True, + metadata={ + 'description': 'The oldest timestamp of the returned messages', + 'example': '2022-07-23T16:20:01.254223', + }, + ) + + end_time = MillisecondsTimestamp( + required=True, + nullable=True, + metadata={ + 'description': 'The newest timestamp of the returned messages', + 'example': '2022-07-23T18:20:01.254223', + }, + )