Completing the Matrix plugin integration

Newly implemented actions:

- `get_messages`
- `get_room_members`
- `update_device`
- `delete_devices`
- `room_alias_to_id`
- `add_room_alias`
- `delete_room_alias`
- `kick`
- `ban`
- `unban`
- `forget`
This commit is contained in:
Fabio Manganiello 2022-08-28 12:26:27 +02:00
parent 0e3cabc5f6
commit e479ca7e3e
Signed by: blacklight
GPG key ID: D90FBA7F76362774
2 changed files with 392 additions and 5 deletions

View file

@ -41,6 +41,7 @@ from nio import (
RoomGetEventError,
RoomGetStateResponse,
RoomMemberEvent,
RoomMessage,
RoomMessageAudio,
RoomMessageFile,
RoomMessageFormatted,
@ -51,6 +52,7 @@ from nio import (
RoomTopicEvent,
RoomUpgradeEvent,
StickerEvent,
SyncResponse,
ToDeviceError,
UnknownEncryptedEvent,
UnknownEvent,
@ -58,13 +60,14 @@ from nio import (
import aiofiles
import aiofiles.os
from nio.api import RoomVisibility
from nio.api import MessageDirection, RoomVisibility
from nio.client.async_client import client_session
from nio.client.base_client import logged_in
from nio.crypto import decrypt_attachment
from nio.crypto.device import OlmDevice
from nio.exceptions import OlmUnverifiedDeviceError
from nio.responses import DownloadResponse
from nio.responses import DownloadResponse, RoomMessagesResponse
from platypush.config import Config
from platypush.context import get_bus
@ -92,6 +95,8 @@ from platypush.schemas.matrix import (
MatrixDeviceSchema,
MatrixDownloadedFileSchema,
MatrixEventIdSchema,
MatrixMemberSchema,
MatrixMessagesResponseSchema,
MatrixMyDeviceSchema,
MatrixProfileSchema,
MatrixRoomIdSchema,
@ -159,6 +164,7 @@ class MatrixClient(AsyncClient):
self._autotrust_rooms_whitelist = autotrust_rooms_whitelist or set()
self._autotrust_users_whitelist = autotrust_users_whitelist or set()
self._first_sync_performed = asyncio.Event()
self._last_batches_by_room = {}
self._encrypted_attachments_keystore_path = os.path.join(
store_path, 'attachment_keys.json'
@ -177,6 +183,7 @@ class MatrixClient(AsyncClient):
pathlib.Path(self._encrypted_attachments_keystore_path).touch(
mode=0o600, exist_ok=True
)
self._encrypted_attachments_keystore = {
tuple(key.split('|')): data for key, data in keystore.items()
}
@ -276,6 +283,37 @@ class MatrixClient(AsyncClient):
self.logger.info('State synchronized')
return login_res
@logged_in
async def sync(self, *args, **kwargs) -> SyncResponse:
response = await super().sync(*args, **kwargs)
assert isinstance(response, SyncResponse), str(response)
self._last_batches_by_room.update(
{
room_id: {
'prev_batch': room.timeline.prev_batch,
'next_batch': response.next_batch,
}
for room_id, room in response.rooms.join.items()
}
)
return response
@logged_in
async def room_messages(
self, room_id: str, start: str | None = None, *args, **kwargs
) -> RoomMessagesResponse:
if not start:
start = self._last_batches_by_room.get(room_id, {}).get('prev_batch')
assert start, (
f'No sync batches were found for room {room_id} and no start'
'batch has been provided'
)
response = await super().room_messages(room_id, start, *args, **kwargs)
assert isinstance(response, RoomMessagesResponse), str(response)
return response
def _sync_devices_trust(self):
all_devices = self.get_devices()
devices_to_trust: Dict[str, OlmDevice] = {}
@ -425,6 +463,7 @@ class MatrixClient(AsyncClient):
), f'Could not retrieve profile for room {room_id}: {ret.message}'
return ret
@client_session
async def download(
self,
server_name: str,
@ -1125,6 +1164,44 @@ class MatrixPlugin(AsyncRunnablePlugin):
setattr(room, k, v)
return MatrixRoomSchema().dump(room)
@action
def get_messages(
self,
room_id: str,
start: str | None = None,
end: str | None = None,
backwards: bool = True,
limit: int = 10,
):
"""
Retrieve a list of messages from a room.
:param room_id: Room ID.
:param start: Start retrieving messages from this batch ID (default:
latest batch returned from a call to ``sync``).
:param end: Retrieving messages until this batch ID.
:param backwards: Set to True if you want to retrieve messages starting
from the most recent, in descending order (default). Otherwise, the
first returned message will be the oldest and messages will be
returned in ascending order.
:param limit: Maximum number of messages to be returned (default: 10).
# :return: .. schema:: matrix.MatrixMessagesResponseSchema
"""
response = self._loop_execute(
self.client.room_messages(
room_id,
start=start,
end=end,
limit=limit,
direction=(
MessageDirection.back if backwards else MessageDirection.front
),
)
)
response.chunk = [m for m in response.chunk if isinstance(m, RoomMessage)]
return MatrixMessagesResponseSchema().dump(response)
@action
def get_my_devices(self):
"""
@ -1144,6 +1221,44 @@ class MatrixPlugin(AsyncRunnablePlugin):
"""
return MatrixDeviceSchema().dump(self._get_device(device_id))
@action
def update_device(self, device_id: str, display_name: str | None = None):
"""
Update information about a user's device.
:param display_name: New display name.
:return: .. schema:: matrix.MatrixDeviceSchema
"""
content = {}
if display_name:
content['display_name'] = display_name
self._loop_execute(self.client.update_device(device_id, content))
return MatrixDeviceSchema().dump(self._get_device(device_id))
@action
def delete_devices(
self,
devices: Sequence[str],
username: str | None = None,
password: str | None = None,
):
"""
Delete a list of devices from the user's authorized list and invalidate
their access tokens.
:param devices: List of devices that should be deleted.
:param username: Username, if the server requires authentication upon
device deletion.
:param password: User password, if the server requires authentication
upon device deletion.
"""
auth = {}
if username and password:
auth = {'type': 'm.login.password', 'user': username, 'password': password}
self._loop_execute(self.client.delete_devices([*devices], auth=auth))
@action
def get_joined_rooms(self):
"""
@ -1154,6 +1269,48 @@ class MatrixPlugin(AsyncRunnablePlugin):
response = self._loop_execute(self.client.joined_rooms())
return [self.get_room(room_id).output for room_id in response.rooms] # type: ignore
@action
def get_room_members(self, room_id: str):
"""
Retrieve the list of users joined into a room.
:param room_id: The room ID.
:return: .. schema:: matrix.MatrixMemberSchema(many=True)
"""
response = self._loop_execute(self.client.joined_members(room_id))
return MatrixMemberSchema().dump(response.members, many=True)
@action
def room_alias_to_id(self, alias: str) -> str:
"""
Convert a room alias (in the format ``#alias:matrix.example.org``) to a
room ID (in the format ``!aBcDeFgHiJkMnO:matrix.example.org').
:param alias: The room alias.
:return: The room ID, as a string.
"""
response = self._loop_execute(self.client.room_resolve_alias(alias))
return response.room_id
@action
def add_room_alias(self, room_id: str, alias: str):
"""
Add an alias to a room.
:param room_id: An existing room ID.
:param alias: The room alias.
"""
self._loop_execute(self.client.room_put_alias(alias, room_id))
@action
def delete_room_alias(self, alias: str):
"""
Delete a room alias.
:param alias: The room alias.
"""
self._loop_execute(self.client.room_delete_alias(alias))
@action
def upload_keys(self):
"""
@ -1342,7 +1499,7 @@ class MatrixPlugin(AsyncRunnablePlugin):
return MatrixRoomIdSchema().dump(rs)
@action
def invite_to_room(self, room_id: str, user_id: str):
def invite(self, room_id: str, user_id: str):
"""
Invite a user to a room.
@ -1352,7 +1509,39 @@ class MatrixPlugin(AsyncRunnablePlugin):
self._loop_execute(self.client.room_invite(room_id, user_id))
@action
def join_room(self, room_id: str):
def kick(self, room_id: str, user_id: str, reason: str | None = None):
"""
Kick a user out of a room.
:param room_id: Room ID.
:param user_id: User ID.
:param reason: Optional reason.
"""
self._loop_execute(self.client.room_kick(room_id, user_id, reason))
@action
def ban(self, room_id: str, user_id: str, reason: str | None = None):
"""
Ban a user from a room.
:param room_id: Room ID.
:param user_id: User ID.
:param reason: Optional reason.
"""
self._loop_execute(self.client.room_ban(room_id, user_id, reason))
@action
def unban(self, room_id: str, user_id: str):
"""
Remove a user ban from a room.
:param room_id: Room ID.
:param user_id: User ID.
"""
self._loop_execute(self.client.room_unban(room_id, user_id))
@action
def join(self, room_id: str):
"""
Join a room.
@ -1361,7 +1550,7 @@ class MatrixPlugin(AsyncRunnablePlugin):
self._loop_execute(self.client.join(room_id))
@action
def leave_room(self, room_id: str):
def leave(self, room_id: str):
"""
Leave a joined room.
@ -1369,5 +1558,17 @@ class MatrixPlugin(AsyncRunnablePlugin):
"""
self._loop_execute(self.client.room_leave(room_id))
@action
def forget(self, room_id: str):
"""
Leave a joined room and forget its data as well as all the messages.
If all the users leave a room, that room will be marked for deletion by
the homeserver.
:param room_id: Room ID.
"""
self._loop_execute(self.client.room_forget(room_id))
# vim:sw=4:ts=4:et:

View file

@ -4,6 +4,14 @@ from marshmallow.schema import Schema
from platypush.schemas import DateTime
class MillisecondsTimestamp(DateTime):
def _get_attr(self, *args, **kwargs):
value = super()._get_attr(*args, **kwargs)
if isinstance(value, int):
value = float(value / 1000)
return value
class MatrixEventIdSchema(Schema):
event_id = fields.String(
required=True,
@ -49,6 +57,12 @@ class MatrixProfileSchema(Schema):
)
class MatrixMemberSchema(MatrixProfileSchema):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields['display_name'].attribute = 'display_name'
class MatrixRoomSchema(Schema):
room_id = fields.String(
required=True,
@ -197,3 +211,175 @@ class MatrixDownloadedFileSchema(Schema):
'example': 1024,
}
)
class MatrixMessageSchema(Schema):
event_id = fields.String(
required=True,
metadata={
'description': 'Event ID associated to this message',
'example': '$2eOQ5ueafANj91GnPCRkRUOOjM7dI5kFDOlfMNCD2ly',
},
)
room_id = fields.String(
required=True,
metadata={
'description': 'The ID of the room containing the message',
'example': '!aBcDeFgHiJkMnO:matrix.example.org',
},
)
user_id = fields.String(
required=True,
attribute='sender',
metadata={
'description': 'ID of the user who sent the message',
'example': '@myuser:matrix.example.org',
},
)
body = fields.String(
required=True,
metadata={
'description': 'Message body',
'example': 'Hello world!',
},
)
format = fields.String(
metadata={
'description': 'Message format',
'example': 'markdown',
},
)
formatted_body = fields.String(
metadata={
'description': 'Formatted body',
'example': '**Hello world!**',
},
)
url = fields.String(
metadata={
'description': 'mxc:// URL if this message contains an attachment',
'example': 'mxc://matrix.example.org/oarGdlpvcwppARPjzNlmlXkD',
},
)
content_type = fields.String(
attribute='mimetype',
metadata={
'description': 'If the message contains an attachment, this field '
'will contain its MIME type',
'example': 'image/jpeg',
},
)
transaction_id = fields.String(
metadata={
'description': 'Set if this message a unique transaction_id associated',
'example': 'mQ8hZR6Dx8I8YDMwONYmBkf7lTgJSMV/ZPqosDNM',
},
)
decrypted = fields.Bool(
metadata={
'description': 'True if the message was encrypted and has been '
'successfully decrypted',
},
)
verified = fields.Bool(
metadata={
'description': 'True if this is an encrypted message coming from a '
'verified source'
},
)
hashes = fields.Dict(
metadata={
'description': 'If the message has been decrypted, this field '
'contains a mapping of its hashes',
'example': {'sha256': 'yoQLQwcURq6/bJp1xQ/uhn9Z2xeA27KhMhPd/mfT8tR'},
},
)
iv = fields.String(
metadata={
'description': 'If the message has been decrypted, this field '
'contains the encryption initial value',
'example': 'NqJMMdijlLvAAAAAAAAAAA',
},
)
key = fields.Dict(
metadata={
'description': 'If the message has been decrypted, this field '
'contains the encryption/decryption key',
'example': {
'alg': 'A256CTR',
'ext': True,
'k': 'u6jjAyNvJoBHE55P5ZfvX49m3oSt9s_L4PSQdprRSJI',
'key_ops': ['encrypt', 'decrypt'],
'kty': 'oct',
},
},
)
timestamp = MillisecondsTimestamp(
required=True,
attribute='server_timestamp',
metadata={
'description': 'When the event was registered on the server',
'example': '2022-07-23T17:20:01.254223',
},
)
class MatrixMessagesResponseSchema(Schema):
messages = fields.Nested(
MatrixMessageSchema(),
many=True,
required=True,
attribute='chunk',
)
start = fields.String(
required=True,
nullable=True,
metadata={
'description': 'Pointer to the first message. It can be used as a '
'``start``/``end`` for another ``get_messages`` query.',
'example': 's10226_143893_619_3648_5951_5_555_7501_0',
},
)
end = fields.String(
required=True,
nullable=True,
metadata={
'description': 'Pointer to the last message. It can be used as a '
'``start``/``end`` for another ``get_messages`` query.',
'example': 't2-10202_143892_626_3663_5949_6_558_7501_0',
},
)
start_time = MillisecondsTimestamp(
required=True,
nullable=True,
metadata={
'description': 'The oldest timestamp of the returned messages',
'example': '2022-07-23T16:20:01.254223',
},
)
end_time = MillisecondsTimestamp(
required=True,
nullable=True,
metadata={
'description': 'The newest timestamp of the returned messages',
'example': '2022-07-23T18:20:01.254223',
},
)