More granular control over trusted devices, and added global synchronization event

This commit is contained in:
Fabio Manganiello 2022-08-25 00:34:01 +02:00
parent 550f026e13
commit e4eb4cd7dc
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
4 changed files with 274 additions and 56 deletions

View file

@ -1,11 +1,10 @@
from abc import ABC
from datetime import datetime
from typing import Dict, Any
from platypush.message.event import Event
class MatrixEvent(Event, ABC):
class MatrixEvent(Event):
"""
Base matrix event.
"""
@ -55,6 +54,13 @@ class MatrixEvent(Event, ABC):
super().__init__(*args, **evt_args, **kwargs)
class MatrixSyncEvent(MatrixEvent):
"""
Event triggered when the startup synchronization has been completed and the
plugin is ready to use.
"""
class MatrixMessageEvent(MatrixEvent):
"""
Event triggered when a message is received on a subscribed room.

View file

@ -7,7 +7,7 @@ import pathlib
import re
from dataclasses import dataclass
from typing import Coroutine
from typing import Collection, Coroutine, Dict
from async_lru import alru_cache
from nio import (
@ -46,6 +46,7 @@ from nio import (
)
from nio.client.async_client import client_session
from nio.crypto.device import OlmDevice
from nio.exceptions import OlmUnverifiedDeviceError
from platypush.config import Config
@ -64,12 +65,14 @@ from platypush.message.event.matrix import (
MatrixRoomLeaveEvent,
MatrixRoomTopicChangedEvent,
MatrixStickerEvent,
MatrixSyncEvent,
)
from platypush.plugins import AsyncRunnablePlugin, action
from platypush.schemas.matrix import (
MatrixDeviceSchema,
MatrixEventIdSchema,
MatrixMyDeviceSchema,
MatrixProfileSchema,
MatrixRoomSchema,
)
@ -101,6 +104,10 @@ class MatrixClient(AsyncClient):
store_path: str | None = None,
config: AsyncClientConfig | None = None,
autojoin_on_invite=True,
autotrust_devices=False,
autotrust_devices_whitelist: Collection[str] | None = None,
autotrust_rooms_whitelist: Collection[str] | None = None,
autotrust_users_whitelist: Collection[str] | None = None,
**kwargs,
):
credentials_file = os.path.abspath(os.path.expanduser(credentials_file))
@ -122,6 +129,10 @@ class MatrixClient(AsyncClient):
self.logger = logging.getLogger(self.__class__.__name__)
self._credentials_file = credentials_file
self._autojoin_on_invite = autojoin_on_invite
self._autotrust_devices = autotrust_devices
self._autotrust_devices_whitelist = autotrust_devices_whitelist
self._autotrust_rooms_whitelist = autotrust_rooms_whitelist or set()
self._autotrust_users_whitelist = autotrust_users_whitelist or set()
self._first_sync_performed = asyncio.Event()
async def _autojoin_room_callback(self, room: MatrixRoom, *_):
@ -204,18 +215,104 @@ class MatrixClient(AsyncClient):
self.logger.info('Uploading encryption keys')
await self.keys_upload()
self.logger.info('Synchronizing rooms')
self.logger.info('Synchronizing state')
self._first_sync_performed.clear()
sync_token = self.loaded_sync_token
self.loaded_sync_token = ''
await self.sync(sync_filter={'room': {'timeline': {'limit': 1}}})
self._add_callbacks()
self.loaded_sync_token = sync_token
self._add_callbacks()
self._sync_devices_trust()
self._first_sync_performed.set()
self.logger.info('Rooms synchronized')
get_bus().post(MatrixSyncEvent(server_url=self.homeserver))
self.logger.info('State synchronized')
return login_res
def _sync_devices_trust(self):
all_devices = self.get_devices()
devices_to_trust: Dict[str, OlmDevice] = {}
untrusted_devices = {
device_id: device
for device_id, device in all_devices.items()
if not device.verified
}
if self._autotrust_devices:
devices_to_trust.update(untrusted_devices)
else:
if self._autotrust_devices_whitelist:
devices_to_trust.update(
{
device_id: device
for device_id, device in all_devices.items()
if device_id in self._autotrust_devices_whitelist
and device_id in untrusted_devices
}
)
if self._autotrust_rooms_whitelist:
devices_to_trust.update(
{
device_id: device
for room_id, devices in self.get_devices_by_room().items()
for device_id, device in devices.items() # type: ignore
if room_id in self._autotrust_rooms_whitelist
and device_id in untrusted_devices
}
)
if self._autotrust_users_whitelist:
devices_to_trust.update(
{
device_id: device
for user_id, devices in self.get_devices_by_user().items()
for device_id, device in devices.items() # type: ignore
if user_id in self._autotrust_users_whitelist
and device_id in untrusted_devices
}
)
for device in devices_to_trust.values():
self.verify_device(device)
self.logger.info(
'Device %s by user %s added to the whitelist', device.id, device.user_id
)
def get_devices_by_user(
self, user_id: str | None = None
) -> Dict[str, Dict[str, OlmDevice]] | Dict[str, OlmDevice]:
devices = {user: devices for user, devices in self.device_store.items()}
if user_id:
devices = devices.get(user_id, {})
return devices
def get_devices(self) -> Dict[str, OlmDevice]:
return {
device_id: device
for _, devices in self.device_store.items()
for device_id, device in devices.items()
}
def get_device(self, device_id: str) -> OlmDevice | None:
return self.get_devices().get(device_id)
def get_devices_by_room(
self, room_id: str | None = None
) -> Dict[str, Dict[str, OlmDevice]] | Dict[str, OlmDevice]:
devices = {
room_id: {
device_id: device
for _, devices in self.room_devices(room_id).items()
for device_id, device in devices.items()
}
for room_id in self.rooms.keys()
}
if room_id:
devices = devices.get(room_id, {})
return devices
def _add_callbacks(self):
self.add_event_callback(self._event_catch_all, Event)
self.add_event_callback(self._on_invite, InviteNameEvent) # type: ignore
@ -526,8 +623,8 @@ class MatrixPlugin(AsyncRunnablePlugin):
``pacman -S libolm``)
* **async_lru** (``pip install async_lru``)
Note that ``libolm`` and the ``[e2e]`` module are only required if you want E2E encryption
support.
Note that ``libolm`` and the ``[e2e]`` module are only required if you want
E2E encryption support.
Unless you configure the extension to use the token of an existing trusted
device, it is recommended that you mark the virtual device used by this
@ -555,19 +652,34 @@ class MatrixPlugin(AsyncRunnablePlugin):
Triggers:
* :class:`platypush.message.event.matrix.MatrixMessageEvent`: when a message is received.
* :class:`platypush.message.event.matrix.MatrixMediaMessageEvent`: when a media message is received.
* :class:`platypush.message.event.matrix.MatrixRoomCreatedEvent`: when a room is created.
* :class:`platypush.message.event.matrix.MatrixRoomJoinEvent`: when a user joins a room.
* :class:`platypush.message.event.matrix.MatrixRoomLeaveEvent`: when a user leaves a room.
* :class:`platypush.message.event.matrix.MatrixRoomInviteEvent`: when the user is invited to a room.
* :class:`platypush.message.event.matrix.MatrixRoomTopicChangedEvent`: when the topic/title of a room changes.
* :class:`platypush.message.event.matrix.MatrixCallInviteEvent`: when the user is invited to a call.
* :class:`platypush.message.event.matrix.MatrixCallAnswerEvent`: when a called user wishes to pick the call.
* :class:`platypush.message.event.matrix.MatrixCallHangupEvent`: when a called user exits the call.
* :class:`platypush.message.event.matrix.MatrixStickerEvent`: when a sticker is sent to a room.
* :class:`platypush.message.event.matrix.MatrixEncryptedMessageEvent`: when a message is received but the
client doesn't have the E2E keys to decrypt it, or encryption has not been enabled.
* :class:`platypush.message.event.matrix.MatrixMessageEvent`: when a
message is received.
* :class:`platypush.message.event.matrix.MatrixMediaMessageEvent`: when
a media message is received.
* :class:`platypush.message.event.matrix.MatrixSyncEvent`: when the
startup synchronization has been completed and the plugin is ready to
use.
* :class:`platypush.message.event.matrix.MatrixRoomCreatedEvent`: when
a room is created.
* :class:`platypush.message.event.matrix.MatrixRoomJoinEvent`: when a
user joins a room.
* :class:`platypush.message.event.matrix.MatrixRoomLeaveEvent`: when a
user leaves a room.
* :class:`platypush.message.event.matrix.MatrixRoomInviteEvent`: when
the user is invited to a room.
* :class:`platypush.message.event.matrix.MatrixRoomTopicChangedEvent`:
when the topic/title of a room changes.
* :class:`platypush.message.event.matrix.MatrixCallInviteEvent`: when
the user is invited to a call.
* :class:`platypush.message.event.matrix.MatrixCallAnswerEvent`: when a
called user wishes to pick the call.
* :class:`platypush.message.event.matrix.MatrixCallHangupEvent`: when a
called user exits the call.
* :class:`platypush.message.event.matrix.MatrixStickerEvent`: when a
sticker is sent to a room.
* :class:`platypush.message.event.matrix.MatrixEncryptedMessageEvent`:
when a message is received but the client doesn't have the E2E keys
to decrypt it, or encryption has not been enabled.
"""
@ -580,6 +692,10 @@ class MatrixPlugin(AsyncRunnablePlugin):
device_name: str | None = 'platypush',
device_id: str | None = None,
autojoin_on_invite: bool = True,
autotrust_devices: bool = False,
autotrust_devices_whitelist: Collection[str] | None = None,
autotrust_users_whitelist: Collection[str] | None = None,
autotrust_rooms_whitelist: Collection[str] | None = None,
**kwargs,
):
"""
@ -594,16 +710,31 @@ class MatrixPlugin(AsyncRunnablePlugin):
the user has 2FA enabled.
:param server_url: Default Matrix instance base URL (default: ``https://matrix.to``).
:param user_id: user_id, in the format ``@user:example.org``, or just the username if the
account is hosted on the same server configured in the ``server_url``.
:param user_id: user_id, in the format ``@user:example.org``, or just
the username if the account is hosted on the same server configured in
the ``server_url``.
:param password: User password.
:param access_token: User access token.
:param device_name: The name of this device/connection (default: ``platypush``).
:param device_id: Use an existing ``device_id`` for the sessions.
:param autojoin_on_invite: Whether the account should automatically join rooms
upon invite. If false, then you may want to implement your own
logic in an event hook when a :class:`platypush.message.event.matrix.MatrixRoomInviteEvent`
event is received, and call the :meth:`.join` method if required.
:param autojoin_on_invite: Whether the account should automatically
join rooms upon invite. If false, then you may want to implement your
own logic in an event hook when a
:class:`platypush.message.event.matrix.MatrixRoomInviteEvent` event is
received, and call the :meth:`.join` method if required.
:param autotrust_devices: If set to True, the plugin will automatically
trust the devices on encrypted rooms. Set this property to True
only if you only plan to use a bot on trusted rooms. Note that if
no automatic trust mechanism is set you may need to explicitly
create your logic for trusting users - either with a hook when
:class:`platypush.message.event.matrix.MatrixSyncEvent` is
received, or when a room is joined, or before sending a message.
:param autotrust_devices_whitelist: Automatically trust devices with IDs
IDs provided in this list.
:param autotrust_users_whitelist: Automatically trust devices from the
user IDs provided in this list.
:param autotrust_rooms_whitelist: Automatically trust devices on the
room IDs provided in this list.
"""
super().__init__(**kwargs)
if not (server_url.startswith('http://') or server_url.startswith('https://')):
@ -614,57 +745,67 @@ class MatrixPlugin(AsyncRunnablePlugin):
if user_id and not re.match(user_id, '^@[a-zA-Z0-9.-_]+:.+'):
user_id = f'@{user_id}:{server_name}'
# self._matrix_proc: multiprocessing.Process | None = None
self._user_id = user_id
self._password = password
self._access_token = access_token
self._device_name = device_name
self._device_id = device_id
self._autojoin_on_invite = autojoin_on_invite
self._autotrust_devices = autotrust_devices
self._autotrust_devices_whitelist = set(autotrust_devices_whitelist or [])
self._autotrust_users_whitelist = set(autotrust_users_whitelist or [])
self._autotrust_rooms_whitelist = set(autotrust_rooms_whitelist or [])
self._workdir = os.path.join(Config.get('workdir'), 'matrix') # type: ignore
self._credentials_file = os.path.join(self._workdir, 'credentials.json')
self._processed_responses = {}
self._client = self._get_client()
pathlib.Path(self._workdir).mkdir(parents=True, exist_ok=True)
def _get_client(self) -> AsyncClient:
def _get_client(self) -> MatrixClient:
return MatrixClient(
homeserver=self._server_url,
user=self._user_id,
credentials_file=self._credentials_file,
autojoin_on_invite=self._autojoin_on_invite,
autotrust_devices=self._autotrust_devices,
autotrust_devices_whitelist=self._autotrust_devices_whitelist,
autotrust_rooms_whitelist=self._autotrust_rooms_whitelist,
autotrust_users_whitelist=self._autotrust_users_whitelist,
device_id=self._device_id,
)
async def _login(self) -> AsyncClient:
@property
def client(self) -> MatrixClient:
if not self._client:
self._client = self._get_client()
return self._client
await self._client.login(
async def _login(self) -> MatrixClient:
await self.client.login(
password=self._password,
device_name=self._device_name,
token=self._access_token,
)
return self._client
return self.client
async def listen(self):
while not self.should_stop():
await self._login()
assert self._client
try:
await self._client.sync_forever(timeout=30000, full_state=True)
await self.client.sync_forever(timeout=30000, full_state=True)
except KeyboardInterrupt:
pass
finally:
try:
await self._client.close()
await self.client.close()
finally:
self._client = None
def _loop_execute(self, coro: Coroutine):
assert self._loop, 'The loop is not running'
try:
ret = asyncio.run_coroutine_threadsafe(coro, self._loop).result()
except OlmUnverifiedDeviceError as e:
@ -694,24 +835,25 @@ class MatrixPlugin(AsyncRunnablePlugin):
`image`. Default: `text`.
:param tx_id: Unique transaction ID to associate to this message.
:param ignore_unverified_devices: If true, unverified devices will be
ignored (default: False).
ignored. Otherwise, if the room is encrypted and it contains
devices that haven't been marked as trusted, the message
delivery may fail (default: False).
:return: .. schema:: matrix.MatrixEventIdSchema
"""
assert self._client, 'Client not connected'
assert self._loop, 'The loop is not running'
ret = self._loop_execute(
self._client.room_send(
message_type='m.' + message_type,
self.client.room_send(
message_type='m.room.message',
room_id=room_id,
tx_id=tx_id,
ignore_unverified_devices=ignore_unverified_devices,
content={
'msgtype': 'm.' + message_type,
'body': body,
},
)
)
assert self._loop
ret = asyncio.run_coroutine_threadsafe(
ret.transport_response.json(), self._loop
).result()
@ -726,8 +868,7 @@ class MatrixPlugin(AsyncRunnablePlugin):
:param user_id: User ID.
:return: .. schema:: matrix.MatrixProfileSchema
"""
assert self._client, 'Client not connected'
profile = self._loop_execute(self._client.get_profile(user_id))
profile = self._loop_execute(self.client.get_profile(user_id)) # type: ignore
profile.user_id = user_id
return MatrixProfileSchema().dump(profile)
@ -739,8 +880,7 @@ class MatrixPlugin(AsyncRunnablePlugin):
:param room_id: room ID.
:return: .. schema:: matrix.MatrixRoomSchema
"""
assert self._client, 'Client not connected'
response = self._loop_execute(self._client.room_get_state(room_id))
response = self._loop_execute(self.client.room_get_state(room_id)) # type: ignore
assert not isinstance(response, RoomGetStateError), response.message
room_args = {'room_id': room_id, 'own_user_id': None, 'encrypted': False}
@ -762,26 +902,32 @@ class MatrixPlugin(AsyncRunnablePlugin):
return MatrixRoomSchema().dump(room)
@action
def get_devices(self):
def get_my_devices(self):
"""
Get the list of devices associated to the current user.
:return: .. schema:: matrix.MatrixDeviceSchema(many=True)
:return: .. schema:: matrix.MatrixMyDeviceSchema(many=True)
"""
assert self._client, 'Client not connected'
response = self._loop_execute(self._client.devices())
response = self._loop_execute(self.client.devices())
assert not isinstance(response, DevicesError), response.message
return MatrixDeviceSchema().dump(response.devices, many=True)
return MatrixMyDeviceSchema().dump(response.devices, many=True)
@action
def get_device(self, device_id: str):
"""
Get the info about a device given its ID.
:return: .. schema:: matrix.MatrixDeviceSchema
"""
return MatrixDeviceSchema().dump(self._get_device(device_id))
@action
def get_joined_rooms(self):
"""
Retrieve the rooms that the user has joined.
"""
assert self._client, 'Client not connected'
response = self._loop_execute(self._client.joined_rooms())
response = self._loop_execute(self.client.joined_rooms())
assert not isinstance(response, JoinedRoomsError), response.message
return [self.get_room(room_id).output for room_id in response.rooms] # type: ignore
@action
@ -789,8 +935,32 @@ class MatrixPlugin(AsyncRunnablePlugin):
"""
Synchronize the E2EE keys with the homeserver.
"""
assert self._client, 'Client not connected'
self._loop_execute(self._client.keys_upload())
self._loop_execute(self.client.keys_upload())
def _get_device(self, device_id: str):
device = self.client.get_device(device_id)
assert device, f'No such device_id: {device_id}'
return device
@action
def trust_device(self, device_id: str):
"""
Mark a device as trusted.
:param device_id: Device ID.
"""
device = self._get_device(device_id)
self.client.verify_device(device)
@action
def untrust_device(self, device_id: str):
"""
Mark a device as untrusted.
:param device_id: Device ID.
"""
device = self._get_device(device_id)
self.client.unverify_device(device)
# vim:sw=4:ts=4:et:

View file

@ -2,6 +2,8 @@ manifest:
events:
platypush.message.event.matrix.MatrixMessageEvent: when a message is received.
platypush.message.event.matrix.MatrixMediaMessageEvent: when a media message is received.
platypush.message.event.matrix.MatrixSyncEvent: when the startup
synchronization has been completed and the plugin is ready to use.
platypush.message.event.matrix.MatrixRoomCreatedEvent: when a room is created.
platypush.message.event.matrix.MatrixRoomJoinEvent: when a user joins a room.
platypush.message.event.matrix.MatrixRoomLeaveEvent: when a user leaves a room.

View file

@ -97,6 +97,46 @@ class MatrixDeviceSchema(Schema):
},
)
user_id = fields.String(
required=True,
metadata={
'description': 'User ID associated to the device',
'example': '@myuser:matrix.example.org',
},
)
display_name = fields.String(
metadata={
'description': 'Display name of the device',
'example': 'Element Android',
},
)
blacklisted = fields.Boolean()
deleted = fields.Boolean(default=False)
ignored = fields.Boolean()
verified = fields.Boolean()
keys = fields.Dict(
metadata={
'description': 'Encryption keys supported by the device',
'example': {
'curve25519': 'BtlB0vaQmtYFsvOYkmxyzw9qP5yGjuAyRh4gXh3q',
'ed25519': 'atohIK2FeVlYoY8xxpZ1bhDbveD+HA2DswNFqUxP',
},
},
)
class MatrixMyDeviceSchema(Schema):
device_id = fields.String(
required=True,
attribute='id',
metadata={
'description': 'ABCDEFG',
},
)
display_name = fields.String(
metadata={
'description': 'Device display name',