From cc29136db7caa55bc6bb6c6f0c1766bc1a7e028d Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 15 Jul 2022 00:37:21 +0200 Subject: [PATCH] [#2] Support for caching rooms info and exposing them in the events --- platypush/plugins/matrix/__init__.py | 66 +++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 11 deletions(-) diff --git a/platypush/plugins/matrix/__init__.py b/platypush/plugins/matrix/__init__.py index 6986dff1..42f9fc7d 100644 --- a/platypush/plugins/matrix/__init__.py +++ b/platypush/plugins/matrix/__init__.py @@ -33,7 +33,7 @@ class RetrieveWorker(ABC): raise NotImplementedError() @abstractmethod - def _process_response(self, rs: dict) -> dict: + def _process_response(self, rs: dict | Collection[dict]) -> dict: raise NotImplementedError() def __call__(self, id: str) -> Tuple[str, dict]: @@ -60,15 +60,20 @@ class UserRetrieveWorker(RetrieveWorker): } -class RoomRetrieveWorker: +class RoomRetrieveWorker(RetrieveWorker): def _url(self, id: str) -> str: - return f'/_matrix/client/r0/room/{id}' + return f'/_matrix/client/v3/rooms/{id}/state' - def _process_response(self, rs: dict) -> dict: - return { - 'display_name': rs.get('displayname'), - 'avatar_url': rs.get('avatar_url'), - } + def _process_response(self, rs: Collection[dict]) -> dict: + info = {} + for event in rs: + event_type = event.get('type') + if event_type == 'm.room.name': + info['name'] = event.get('content', {}).get('name') + elif event_type == 'm.room.topic': + info['name'] = event.get('content', {}).get('topic') + + return info class MatrixPlugin(RunnablePlugin): @@ -128,8 +133,8 @@ class MatrixPlugin(RunnablePlugin): self._credentials_file = os.path.join(self._workdir, 'credentials.json') self._users_cache_file = os.path.join(self._workdir, 'users.json') self._rooms_cache_file = os.path.join(self._workdir, 'rooms.json') - self._users_cache = {} - self._rooms_cache = {} + self._users_cache: Dict[str, dict] = {} + self._rooms_cache: Dict[str, dict] = {} self._set_credentials(username, password, access_token, overwrite=True) def _set_credentials( @@ -230,9 +235,12 @@ class MatrixPlugin(RunnablePlugin): ) -> Optional[MatrixEvent]: evt_type = event.get('type') evt_class = None + room_info = self._rooms_cache.get(room_id, {}) args: Dict[str, Any] = { 'server_url': self._server_url, 'room_id': room_id, + 'room_name': room_info.get('name'), + 'room_topic': room_info.get('topic'), } if event.get('sender') and isinstance(event.get('sender'), str): @@ -249,6 +257,7 @@ class MatrixPlugin(RunnablePlugin): if evt_type == 'm.room.topic': evt_class = MatrixRoomTopicChangeEvent args['topic'] = event.get('content', {}).get('topic') # type: ignore + # TODO Handle encrypted rooms events (`m.room.encrypted`) elif evt_type == 'm.room.message': evt_class = MatrixMessageEvent args['body'] = event.get('content', {}).get('body') # type: ignore @@ -308,6 +317,20 @@ class MatrixPlugin(RunnablePlugin): for user_id, info in pool_res } + def _retrieve_rooms_info(self, rooms: Collection[str]) -> Dict[str, dict]: + rooms_info = {room: {} for room in rooms} + retrieve = RoomRetrieveWorker(self._server_url, self._access_token or '') + with multiprocessing.Pool(4) as pool: + pool_res = pool.map(retrieve, rooms_info.keys()) + + return { + room_id: { + 'room_id': room_id, + **info, + } + for room_id, info in pool_res + } + def _extract_senders(self, rooms) -> Dict[str, dict]: cache_has_changes = False senders = set() @@ -340,14 +363,34 @@ class MatrixPlugin(RunnablePlugin): return senders_map + def _extract_rooms(self, rooms: Collection[str]) -> Dict[str, dict]: + missing_rooms_info = { + room_id for room_id in rooms if not self._rooms_cache.get(room_id) + } + + if missing_rooms_info: + self._rooms_cache.update(self._retrieve_rooms_info(missing_rooms_info)) + self._rewrite_rooms_cache() + + return { + room_id: self._rooms_cache.get( + room_id, + { + 'room_id': room_id, + }, + ) + for room_id in rooms + } + def _process_events(self, events: dict) -> Collection[MatrixEvent]: rooms = events.get('rooms', {}) joined_rooms = rooms.get('join', {}) invited_rooms = rooms.get('invite', {}) parsed_events = [] senders = self._extract_senders(joined_rooms.values()) + self._extract_rooms(joined_rooms.keys()) - # Create events + # Create joined rooms events for room_id, room in joined_rooms.items(): room_events = room.get('timeline', {}).get('events', []) parsed_room_events = [ @@ -357,6 +400,7 @@ class MatrixPlugin(RunnablePlugin): parsed_events.extend([evt for evt in parsed_room_events if evt]) + # Create invite events for room_id, room in invited_rooms.items(): room_events = room.get('invite_state', {}).get('events', []) parsed_room_event = self._parse_invite_event(