diff --git a/platypush/message/event/matrix.py b/platypush/message/event/matrix.py index bc3e6fdb0..f6e7a3ac0 100644 --- a/platypush/message/event/matrix.py +++ b/platypush/message/event/matrix.py @@ -1,11 +1,10 @@ -from abc import ABC from datetime import datetime from typing import Dict, Any from platypush.message.event import Event -class MatrixEvent(Event, ABC): +class MatrixEvent(Event): """ Base matrix event. """ @@ -55,6 +54,13 @@ class MatrixEvent(Event, ABC): super().__init__(*args, **evt_args, **kwargs) +class MatrixSyncEvent(MatrixEvent): + """ + Event triggered when the startup synchronization has been completed and the + plugin is ready to use. + """ + + class MatrixMessageEvent(MatrixEvent): """ Event triggered when a message is received on a subscribed room. diff --git a/platypush/plugins/matrix/__init__.py b/platypush/plugins/matrix/__init__.py index 0ad35ef32..a673ae204 100644 --- a/platypush/plugins/matrix/__init__.py +++ b/platypush/plugins/matrix/__init__.py @@ -7,7 +7,7 @@ import pathlib import re from dataclasses import dataclass -from typing import Coroutine +from typing import Collection, Coroutine, Dict from async_lru import alru_cache from nio import ( @@ -46,6 +46,7 @@ from nio import ( ) from nio.client.async_client import client_session +from nio.crypto.device import OlmDevice from nio.exceptions import OlmUnverifiedDeviceError from platypush.config import Config @@ -64,12 +65,14 @@ from platypush.message.event.matrix import ( MatrixRoomLeaveEvent, MatrixRoomTopicChangedEvent, MatrixStickerEvent, + MatrixSyncEvent, ) from platypush.plugins import AsyncRunnablePlugin, action from platypush.schemas.matrix import ( MatrixDeviceSchema, MatrixEventIdSchema, + MatrixMyDeviceSchema, MatrixProfileSchema, MatrixRoomSchema, ) @@ -101,6 +104,10 @@ class MatrixClient(AsyncClient): store_path: str | None = None, config: AsyncClientConfig | None = None, autojoin_on_invite=True, + autotrust_devices=False, + autotrust_devices_whitelist: Collection[str] | None = None, + autotrust_rooms_whitelist: Collection[str] | None = None, + autotrust_users_whitelist: Collection[str] | None = None, **kwargs, ): credentials_file = os.path.abspath(os.path.expanduser(credentials_file)) @@ -122,6 +129,10 @@ class MatrixClient(AsyncClient): self.logger = logging.getLogger(self.__class__.__name__) self._credentials_file = credentials_file self._autojoin_on_invite = autojoin_on_invite + self._autotrust_devices = autotrust_devices + self._autotrust_devices_whitelist = autotrust_devices_whitelist + self._autotrust_rooms_whitelist = autotrust_rooms_whitelist or set() + self._autotrust_users_whitelist = autotrust_users_whitelist or set() self._first_sync_performed = asyncio.Event() async def _autojoin_room_callback(self, room: MatrixRoom, *_): @@ -204,18 +215,104 @@ class MatrixClient(AsyncClient): self.logger.info('Uploading encryption keys') await self.keys_upload() - self.logger.info('Synchronizing rooms') + self.logger.info('Synchronizing state') self._first_sync_performed.clear() sync_token = self.loaded_sync_token self.loaded_sync_token = '' await self.sync(sync_filter={'room': {'timeline': {'limit': 1}}}) - self._add_callbacks() - self.loaded_sync_token = sync_token + + self._add_callbacks() + self._sync_devices_trust() self._first_sync_performed.set() - self.logger.info('Rooms synchronized') + + get_bus().post(MatrixSyncEvent(server_url=self.homeserver)) + self.logger.info('State synchronized') return login_res + def _sync_devices_trust(self): + all_devices = self.get_devices() + devices_to_trust: Dict[str, OlmDevice] = {} + untrusted_devices = { + device_id: device + for device_id, device in all_devices.items() + if not device.verified + } + + if self._autotrust_devices: + devices_to_trust.update(untrusted_devices) + else: + if self._autotrust_devices_whitelist: + devices_to_trust.update( + { + device_id: device + for device_id, device in all_devices.items() + if device_id in self._autotrust_devices_whitelist + and device_id in untrusted_devices + } + ) + if self._autotrust_rooms_whitelist: + devices_to_trust.update( + { + device_id: device + for room_id, devices in self.get_devices_by_room().items() + for device_id, device in devices.items() # type: ignore + if room_id in self._autotrust_rooms_whitelist + and device_id in untrusted_devices + } + ) + if self._autotrust_users_whitelist: + devices_to_trust.update( + { + device_id: device + for user_id, devices in self.get_devices_by_user().items() + for device_id, device in devices.items() # type: ignore + if user_id in self._autotrust_users_whitelist + and device_id in untrusted_devices + } + ) + + for device in devices_to_trust.values(): + self.verify_device(device) + self.logger.info( + 'Device %s by user %s added to the whitelist', device.id, device.user_id + ) + + def get_devices_by_user( + self, user_id: str | None = None + ) -> Dict[str, Dict[str, OlmDevice]] | Dict[str, OlmDevice]: + devices = {user: devices for user, devices in self.device_store.items()} + + if user_id: + devices = devices.get(user_id, {}) + return devices + + def get_devices(self) -> Dict[str, OlmDevice]: + return { + device_id: device + for _, devices in self.device_store.items() + for device_id, device in devices.items() + } + + def get_device(self, device_id: str) -> OlmDevice | None: + return self.get_devices().get(device_id) + + def get_devices_by_room( + self, room_id: str | None = None + ) -> Dict[str, Dict[str, OlmDevice]] | Dict[str, OlmDevice]: + devices = { + room_id: { + device_id: device + for _, devices in self.room_devices(room_id).items() + for device_id, device in devices.items() + } + for room_id in self.rooms.keys() + } + + if room_id: + devices = devices.get(room_id, {}) + return devices + def _add_callbacks(self): self.add_event_callback(self._event_catch_all, Event) self.add_event_callback(self._on_invite, InviteNameEvent) # type: ignore @@ -526,8 +623,8 @@ class MatrixPlugin(AsyncRunnablePlugin): ``pacman -S libolm``) * **async_lru** (``pip install async_lru``) - Note that ``libolm`` and the ``[e2e]`` module are only required if you want E2E encryption - support. + Note that ``libolm`` and the ``[e2e]`` module are only required if you want + E2E encryption support. Unless you configure the extension to use the token of an existing trusted device, it is recommended that you mark the virtual device used by this @@ -555,19 +652,34 @@ class MatrixPlugin(AsyncRunnablePlugin): Triggers: - * :class:`platypush.message.event.matrix.MatrixMessageEvent`: when a message is received. - * :class:`platypush.message.event.matrix.MatrixMediaMessageEvent`: when a media message is received. - * :class:`platypush.message.event.matrix.MatrixRoomCreatedEvent`: when a room is created. - * :class:`platypush.message.event.matrix.MatrixRoomJoinEvent`: when a user joins a room. - * :class:`platypush.message.event.matrix.MatrixRoomLeaveEvent`: when a user leaves a room. - * :class:`platypush.message.event.matrix.MatrixRoomInviteEvent`: when the user is invited to a room. - * :class:`platypush.message.event.matrix.MatrixRoomTopicChangedEvent`: when the topic/title of a room changes. - * :class:`platypush.message.event.matrix.MatrixCallInviteEvent`: when the user is invited to a call. - * :class:`platypush.message.event.matrix.MatrixCallAnswerEvent`: when a called user wishes to pick the call. - * :class:`platypush.message.event.matrix.MatrixCallHangupEvent`: when a called user exits the call. - * :class:`platypush.message.event.matrix.MatrixStickerEvent`: when a sticker is sent to a room. - * :class:`platypush.message.event.matrix.MatrixEncryptedMessageEvent`: when a message is received but the - client doesn't have the E2E keys to decrypt it, or encryption has not been enabled. + * :class:`platypush.message.event.matrix.MatrixMessageEvent`: when a + message is received. + * :class:`platypush.message.event.matrix.MatrixMediaMessageEvent`: when + a media message is received. + * :class:`platypush.message.event.matrix.MatrixSyncEvent`: when the + startup synchronization has been completed and the plugin is ready to + use. + * :class:`platypush.message.event.matrix.MatrixRoomCreatedEvent`: when + a room is created. + * :class:`platypush.message.event.matrix.MatrixRoomJoinEvent`: when a + user joins a room. + * :class:`platypush.message.event.matrix.MatrixRoomLeaveEvent`: when a + user leaves a room. + * :class:`platypush.message.event.matrix.MatrixRoomInviteEvent`: when + the user is invited to a room. + * :class:`platypush.message.event.matrix.MatrixRoomTopicChangedEvent`: + when the topic/title of a room changes. + * :class:`platypush.message.event.matrix.MatrixCallInviteEvent`: when + the user is invited to a call. + * :class:`platypush.message.event.matrix.MatrixCallAnswerEvent`: when a + called user wishes to pick the call. + * :class:`platypush.message.event.matrix.MatrixCallHangupEvent`: when a + called user exits the call. + * :class:`platypush.message.event.matrix.MatrixStickerEvent`: when a + sticker is sent to a room. + * :class:`platypush.message.event.matrix.MatrixEncryptedMessageEvent`: + when a message is received but the client doesn't have the E2E keys + to decrypt it, or encryption has not been enabled. """ @@ -580,6 +692,10 @@ class MatrixPlugin(AsyncRunnablePlugin): device_name: str | None = 'platypush', device_id: str | None = None, autojoin_on_invite: bool = True, + autotrust_devices: bool = False, + autotrust_devices_whitelist: Collection[str] | None = None, + autotrust_users_whitelist: Collection[str] | None = None, + autotrust_rooms_whitelist: Collection[str] | None = None, **kwargs, ): """ @@ -594,16 +710,31 @@ class MatrixPlugin(AsyncRunnablePlugin): the user has 2FA enabled. :param server_url: Default Matrix instance base URL (default: ``https://matrix.to``). - :param user_id: user_id, in the format ``@user:example.org``, or just the username if the - account is hosted on the same server configured in the ``server_url``. + :param user_id: user_id, in the format ``@user:example.org``, or just + the username if the account is hosted on the same server configured in + the ``server_url``. :param password: User password. :param access_token: User access token. :param device_name: The name of this device/connection (default: ``platypush``). :param device_id: Use an existing ``device_id`` for the sessions. - :param autojoin_on_invite: Whether the account should automatically join rooms - upon invite. If false, then you may want to implement your own - logic in an event hook when a :class:`platypush.message.event.matrix.MatrixRoomInviteEvent` - event is received, and call the :meth:`.join` method if required. + :param autojoin_on_invite: Whether the account should automatically + join rooms upon invite. If false, then you may want to implement your + own logic in an event hook when a + :class:`platypush.message.event.matrix.MatrixRoomInviteEvent` event is + received, and call the :meth:`.join` method if required. + :param autotrust_devices: If set to True, the plugin will automatically + trust the devices on encrypted rooms. Set this property to True + only if you only plan to use a bot on trusted rooms. Note that if + no automatic trust mechanism is set you may need to explicitly + create your logic for trusting users - either with a hook when + :class:`platypush.message.event.matrix.MatrixSyncEvent` is + received, or when a room is joined, or before sending a message. + :param autotrust_devices_whitelist: Automatically trust devices with IDs + IDs provided in this list. + :param autotrust_users_whitelist: Automatically trust devices from the + user IDs provided in this list. + :param autotrust_rooms_whitelist: Automatically trust devices on the + room IDs provided in this list. """ super().__init__(**kwargs) if not (server_url.startswith('http://') or server_url.startswith('https://')): @@ -614,57 +745,67 @@ class MatrixPlugin(AsyncRunnablePlugin): if user_id and not re.match(user_id, '^@[a-zA-Z0-9.-_]+:.+'): user_id = f'@{user_id}:{server_name}' - # self._matrix_proc: multiprocessing.Process | None = None self._user_id = user_id self._password = password self._access_token = access_token self._device_name = device_name self._device_id = device_id self._autojoin_on_invite = autojoin_on_invite + self._autotrust_devices = autotrust_devices + self._autotrust_devices_whitelist = set(autotrust_devices_whitelist or []) + self._autotrust_users_whitelist = set(autotrust_users_whitelist or []) + self._autotrust_rooms_whitelist = set(autotrust_rooms_whitelist or []) self._workdir = os.path.join(Config.get('workdir'), 'matrix') # type: ignore self._credentials_file = os.path.join(self._workdir, 'credentials.json') self._processed_responses = {} self._client = self._get_client() pathlib.Path(self._workdir).mkdir(parents=True, exist_ok=True) - def _get_client(self) -> AsyncClient: + def _get_client(self) -> MatrixClient: return MatrixClient( homeserver=self._server_url, user=self._user_id, credentials_file=self._credentials_file, autojoin_on_invite=self._autojoin_on_invite, + autotrust_devices=self._autotrust_devices, + autotrust_devices_whitelist=self._autotrust_devices_whitelist, + autotrust_rooms_whitelist=self._autotrust_rooms_whitelist, + autotrust_users_whitelist=self._autotrust_users_whitelist, device_id=self._device_id, ) - async def _login(self) -> AsyncClient: + @property + def client(self) -> MatrixClient: if not self._client: self._client = self._get_client() + return self._client - await self._client.login( + async def _login(self) -> MatrixClient: + await self.client.login( password=self._password, device_name=self._device_name, token=self._access_token, ) - return self._client + return self.client async def listen(self): while not self.should_stop(): await self._login() - assert self._client try: - await self._client.sync_forever(timeout=30000, full_state=True) + await self.client.sync_forever(timeout=30000, full_state=True) except KeyboardInterrupt: pass finally: try: - await self._client.close() + await self.client.close() finally: self._client = None def _loop_execute(self, coro: Coroutine): assert self._loop, 'The loop is not running' + try: ret = asyncio.run_coroutine_threadsafe(coro, self._loop).result() except OlmUnverifiedDeviceError as e: @@ -694,24 +835,25 @@ class MatrixPlugin(AsyncRunnablePlugin): `image`. Default: `text`. :param tx_id: Unique transaction ID to associate to this message. :param ignore_unverified_devices: If true, unverified devices will be - ignored (default: False). + ignored. Otherwise, if the room is encrypted and it contains + devices that haven't been marked as trusted, the message + delivery may fail (default: False). :return: .. schema:: matrix.MatrixEventIdSchema """ - assert self._client, 'Client not connected' - assert self._loop, 'The loop is not running' - ret = self._loop_execute( - self._client.room_send( - message_type='m.' + message_type, + self.client.room_send( + message_type='m.room.message', room_id=room_id, tx_id=tx_id, ignore_unverified_devices=ignore_unverified_devices, content={ + 'msgtype': 'm.' + message_type, 'body': body, }, ) ) + assert self._loop ret = asyncio.run_coroutine_threadsafe( ret.transport_response.json(), self._loop ).result() @@ -726,8 +868,7 @@ class MatrixPlugin(AsyncRunnablePlugin): :param user_id: User ID. :return: .. schema:: matrix.MatrixProfileSchema """ - assert self._client, 'Client not connected' - profile = self._loop_execute(self._client.get_profile(user_id)) + profile = self._loop_execute(self.client.get_profile(user_id)) # type: ignore profile.user_id = user_id return MatrixProfileSchema().dump(profile) @@ -739,8 +880,7 @@ class MatrixPlugin(AsyncRunnablePlugin): :param room_id: room ID. :return: .. schema:: matrix.MatrixRoomSchema """ - assert self._client, 'Client not connected' - response = self._loop_execute(self._client.room_get_state(room_id)) + response = self._loop_execute(self.client.room_get_state(room_id)) # type: ignore assert not isinstance(response, RoomGetStateError), response.message room_args = {'room_id': room_id, 'own_user_id': None, 'encrypted': False} @@ -762,26 +902,32 @@ class MatrixPlugin(AsyncRunnablePlugin): return MatrixRoomSchema().dump(room) @action - def get_devices(self): + def get_my_devices(self): """ Get the list of devices associated to the current user. - :return: .. schema:: matrix.MatrixDeviceSchema(many=True) + :return: .. schema:: matrix.MatrixMyDeviceSchema(many=True) """ - assert self._client, 'Client not connected' - response = self._loop_execute(self._client.devices()) + response = self._loop_execute(self.client.devices()) assert not isinstance(response, DevicesError), response.message - return MatrixDeviceSchema().dump(response.devices, many=True) + return MatrixMyDeviceSchema().dump(response.devices, many=True) + + @action + def get_device(self, device_id: str): + """ + Get the info about a device given its ID. + + :return: .. schema:: matrix.MatrixDeviceSchema + """ + return MatrixDeviceSchema().dump(self._get_device(device_id)) @action def get_joined_rooms(self): """ Retrieve the rooms that the user has joined. """ - assert self._client, 'Client not connected' - response = self._loop_execute(self._client.joined_rooms()) + response = self._loop_execute(self.client.joined_rooms()) assert not isinstance(response, JoinedRoomsError), response.message - return [self.get_room(room_id).output for room_id in response.rooms] # type: ignore @action @@ -789,8 +935,32 @@ class MatrixPlugin(AsyncRunnablePlugin): """ Synchronize the E2EE keys with the homeserver. """ - assert self._client, 'Client not connected' - self._loop_execute(self._client.keys_upload()) + self._loop_execute(self.client.keys_upload()) + + def _get_device(self, device_id: str): + device = self.client.get_device(device_id) + assert device, f'No such device_id: {device_id}' + return device + + @action + def trust_device(self, device_id: str): + """ + Mark a device as trusted. + + :param device_id: Device ID. + """ + device = self._get_device(device_id) + self.client.verify_device(device) + + @action + def untrust_device(self, device_id: str): + """ + Mark a device as untrusted. + + :param device_id: Device ID. + """ + device = self._get_device(device_id) + self.client.unverify_device(device) # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/matrix/manifest.yaml b/platypush/plugins/matrix/manifest.yaml index d564aaaf1..3f0b5e5b2 100644 --- a/platypush/plugins/matrix/manifest.yaml +++ b/platypush/plugins/matrix/manifest.yaml @@ -2,6 +2,8 @@ manifest: events: platypush.message.event.matrix.MatrixMessageEvent: when a message is received. platypush.message.event.matrix.MatrixMediaMessageEvent: when a media message is received. + platypush.message.event.matrix.MatrixSyncEvent: when the startup + synchronization has been completed and the plugin is ready to use. platypush.message.event.matrix.MatrixRoomCreatedEvent: when a room is created. platypush.message.event.matrix.MatrixRoomJoinEvent: when a user joins a room. platypush.message.event.matrix.MatrixRoomLeaveEvent: when a user leaves a room. diff --git a/platypush/schemas/matrix.py b/platypush/schemas/matrix.py index aa7279f5a..b28a6cbf2 100644 --- a/platypush/schemas/matrix.py +++ b/platypush/schemas/matrix.py @@ -97,6 +97,46 @@ class MatrixDeviceSchema(Schema): }, ) + user_id = fields.String( + required=True, + metadata={ + 'description': 'User ID associated to the device', + 'example': '@myuser:matrix.example.org', + }, + ) + + display_name = fields.String( + metadata={ + 'description': 'Display name of the device', + 'example': 'Element Android', + }, + ) + + blacklisted = fields.Boolean() + deleted = fields.Boolean(default=False) + ignored = fields.Boolean() + verified = fields.Boolean() + + keys = fields.Dict( + metadata={ + 'description': 'Encryption keys supported by the device', + 'example': { + 'curve25519': 'BtlB0vaQmtYFsvOYkmxyzw9qP5yGjuAyRh4gXh3q', + 'ed25519': 'atohIK2FeVlYoY8xxpZ1bhDbveD+HA2DswNFqUxP', + }, + }, + ) + + +class MatrixMyDeviceSchema(Schema): + device_id = fields.String( + required=True, + attribute='id', + metadata={ + 'description': 'ABCDEFG', + }, + ) + display_name = fields.String( metadata={ 'description': 'Device display name',