diff --git a/platypush/config/__init__.py b/platypush/config/__init__.py index e9acfec45..481bd2751 100644 --- a/platypush/config/__init__.py +++ b/platypush/config/__init__.py @@ -213,11 +213,10 @@ class Config: config['scripts_dir'] = os.path.abspath( os.path.expanduser(file_config[section]) ) - elif ( - 'disabled' not in file_config[section] - or file_config[section]['disabled'] is False - ): - config[section] = file_config[section] + else: + section_config = file_config.get(section, {}) or {} + if not section_config.get('disabled'): + config[section] = section_config return config diff --git a/platypush/message/event/matrix.py b/platypush/message/event/matrix.py new file mode 100644 index 000000000..d1ac3f8d7 --- /dev/null +++ b/platypush/message/event/matrix.py @@ -0,0 +1,103 @@ +from abc import ABC +from datetime import datetime +from typing import Dict, Any + +from platypush.message.event import Event + + +class MatrixEvent(Event, ABC): + """ + Base matrix event. + """ + + def __init__( + self, + *args, + server_url: str, + sender_id: str | None = None, + sender_display_name: str | None = None, + sender_avatar_url: str | None = None, + room_id: str | None = None, + room_name: str | None = None, + room_topic: str | None = None, + server_timestamp: datetime | None = None, + **kwargs + ): + """ + :param server_url: Base server URL. + :param sender_id: The event's sender ID. + :param sender_display_name: The event's sender display name. + :param sender_avatar_url: The event's sender avatar URL. + :param room_id: Event room ID. + :param room_name: The name of the room associated to the event. + :param room_topic: The topic of the room associated to the event. + :param server_timestamp: The server timestamp of the event. + """ + evt_args: Dict[str, Any] = { + 'server_url': server_url, + } + + if sender_id: + evt_args['sender_id'] = sender_id + if sender_display_name: + evt_args['sender_display_name'] = sender_display_name + if sender_avatar_url: + evt_args['sender_avatar_url'] = sender_avatar_url + if room_id: + evt_args['room_id'] = room_id + if room_name: + evt_args['room_name'] = room_name + if room_topic: + evt_args['room_topic'] = room_topic + if server_timestamp: + evt_args['server_timestamp'] = server_timestamp + + super().__init__(*args, **evt_args, **kwargs) + + +class MatrixMessageEvent(MatrixEvent): + """ + Event triggered when a message is received on a subscribed room. + """ + + def __init__(self, *args, body: str, **kwargs): + """ + :param body: The body of the message. + """ + super().__init__(*args, body=body, **kwargs) + + +class MatrixRoomJoinEvent(MatrixEvent): + """ + Event triggered when a user joins a room. + """ + + +class MatrixRoomLeaveEvent(MatrixEvent): + """ + Event triggered when a user leaves a room. + """ + + +class MatrixRoomInviteEvent(MatrixEvent): + """ + Event triggered when a user is invited to a room. + """ + + +class MatrixRoomInviteMeEvent(MatrixEvent): + """ + Event triggered when the currently logged in user is invited to a room. + """ + + +class MatrixRoomTopicChangeEvent(MatrixEvent): + """ + Event triggered when the topic/title of a room changes. + """ + + def __init__(self, *args, topic: str, **kwargs): + """ + :param topic: New room topic. + """ + super().__init__(*args, topic=topic, **kwargs) diff --git a/platypush/message/request/__init__.py b/platypush/message/request/__init__.py index 40d189d4d..46f52452b 100644 --- a/platypush/message/request/__init__.py +++ b/platypush/message/request/__init__.py @@ -12,17 +12,30 @@ from platypush.config import Config from platypush.context import get_plugin from platypush.message import Message from platypush.message.response import Response -from platypush.utils import get_hash, get_module_and_method_from_action, get_redis_queue_name_by_message, \ - is_functional_procedure +from platypush.utils import ( + get_hash, + get_module_and_method_from_action, + get_redis_queue_name_by_message, + is_functional_procedure, +) logger = logging.getLogger('platypush') class Request(Message): - """ Request message class """ + """Request message class""" - def __init__(self, target, action, origin=None, id=None, backend=None, - args=None, token=None, timestamp=None): + def __init__( + self, + target, + action, + origin=None, + id=None, + backend=None, + args=None, + token=None, + timestamp=None, + ): """ Params: target -- Target node [Str] @@ -48,9 +61,13 @@ class Request(Message): @classmethod def build(cls, msg): msg = super().parse(msg) - args = {'target': msg.get('target', Config.get('device_id')), 'action': msg['action'], - 'args': msg.get('args', {}), 'id': msg['id'] if 'id' in msg else cls._generate_id(), - 'timestamp': msg['_timestamp'] if '_timestamp' in msg else time.time()} + args = { + 'target': msg.get('target', Config.get('device_id')), + 'action': msg['action'], + 'args': msg.get('args', {}), + 'id': msg['id'] if 'id' in msg else cls._generate_id(), + 'timestamp': msg['_timestamp'] if '_timestamp' in msg else time.time(), + } if 'origin' in msg: args['origin'] = msg['origin'] @@ -61,7 +78,7 @@ class Request(Message): @staticmethod def _generate_id(): _id = '' - for i in range(0, 16): + for _ in range(0, 16): _id += '%.2x' % random.randint(0, 255) return _id @@ -84,9 +101,14 @@ class Request(Message): return proc_config(*args, **kwargs) - proc = Procedure.build(name=proc_name, requests=proc_config['actions'], - _async=proc_config['_async'], args=self.args, - backend=self.backend, id=self.id) + proc = Procedure.build( + name=proc_name, + requests=proc_config['actions'], + _async=proc_config['_async'], + args=self.args, + backend=self.backend, + id=self.id, + ) return proc.execute(*args, **kwargs) @@ -112,7 +134,7 @@ class Request(Message): if isinstance(value, str): value = self.expand_value_from_context(value, **context) - elif isinstance(value, dict) or isinstance(value, list): + elif isinstance(value, (dict, list)): self._expand_context(event_args=value, **context) event_args[key] = value @@ -132,7 +154,11 @@ class Request(Message): try: exec('{}="{}"'.format(k, re.sub(r'(^|[^\\])"', '\1\\"', v))) except Exception as e: - logger.debug('Could not set context variable {}={}: {}'.format(k, v, str(e))) + logger.debug( + 'Could not set context variable {}={}: {}'.format( + k, v, str(e) + ) + ) logger.debug('Context: {}'.format(context)) parsed_value = '' @@ -152,7 +178,7 @@ class Request(Message): if callable(context_value): context_value = context_value() - if isinstance(context_value, range) or isinstance(context_value, tuple): + if isinstance(context_value, (range, tuple)): context_value = [*context_value] if isinstance(context_value, datetime.date): context_value = context_value.isoformat() @@ -162,7 +188,7 @@ class Request(Message): parsed_value += prefix + ( json.dumps(context_value) - if isinstance(context_value, list) or isinstance(context_value, dict) + if isinstance(context_value, (list, dict)) else str(context_value) ) else: @@ -205,6 +231,9 @@ class Request(Message): """ def _thread_func(_n_tries, errors=None): + from platypush.context import get_bus + from platypush.plugins import RunnablePlugin + response = None try: @@ -221,11 +250,15 @@ class Request(Message): return response else: action = self.expand_value_from_context(self.action, **context) - (module_name, method_name) = get_module_and_method_from_action(action) + (module_name, method_name) = get_module_and_method_from_action( + action + ) plugin = get_plugin(module_name) except Exception as e: logger.exception(e) - msg = 'Uncaught pre-processing exception from action [{}]: {}'.format(self.action, str(e)) + msg = 'Uncaught pre-processing exception from action [{}]: {}'.format( + self.action, str(e) + ) logger.warning(msg) response = Response(output=None, errors=[msg]) self._send_response(response) @@ -243,24 +276,37 @@ class Request(Message): response = plugin.run(method_name, args) if not response: - logger.warning('Received null response from action {}'.format(action)) + logger.warning( + 'Received null response from action {}'.format(action) + ) else: if response.is_error(): - logger.warning(('Response processed with errors from ' + - 'action {}: {}').format( - action, str(response))) + logger.warning( + ( + 'Response processed with errors from ' + 'action {}: {}' + ).format(action, str(response)) + ) elif not response.disable_logging: - logger.info('Processed response from action {}: {}'. - format(action, str(response))) + logger.info( + 'Processed response from action {}: {}'.format( + action, str(response) + ) + ) except (AssertionError, TimeoutError) as e: plugin.logger.exception(e) - logger.warning('{} from action [{}]: {}'.format(type(e), action, str(e))) + logger.warning( + '{} from action [{}]: {}'.format(type(e), action, str(e)) + ) response = Response(output=None, errors=[str(e)]) except Exception as e: # Retry mechanism plugin.logger.exception(e) - logger.warning(('Uncaught exception while processing response ' + - 'from action [{}]: {}').format(action, str(e))) + logger.warning( + ( + 'Uncaught exception while processing response ' + + 'from action [{}]: {}' + ).format(action, str(e)) + ) errors = errors or [] if str(e) not in errors: @@ -269,17 +315,21 @@ class Request(Message): response = Response(output=None, errors=errors) if _n_tries - 1 > 0: logger.info('Reloading plugin {} and retrying'.format(module_name)) - get_plugin(module_name, reload=True) - response = _thread_func(_n_tries=_n_tries-1, errors=errors) + plugin = get_plugin(module_name, reload=True) + if isinstance(plugin, RunnablePlugin): + plugin.bus = get_bus() + plugin.start() + + response = _thread_func(_n_tries=_n_tries - 1, errors=errors) finally: self._send_response(response) - return response - token_hash = Config.get('token_hash') + return response - if token_hash: - if self.token is None or get_hash(self.token) != token_hash: - raise PermissionError() + stored_token_hash = Config.get('token_hash') + token = getattr(self, 'token', '') + if stored_token_hash and get_hash(token) != stored_token_hash: + raise PermissionError() if _async: Thread(target=_thread_func, args=(n_tries,)).start() @@ -292,15 +342,18 @@ class Request(Message): the message into a UTF-8 JSON string """ - return json.dumps({ - 'type': 'request', - 'target': self.target, - 'action': self.action, - 'args': self.args, - 'origin': self.origin if hasattr(self, 'origin') else None, - 'id': self.id if hasattr(self, 'id') else None, - 'token': self.token if hasattr(self, 'token') else None, - '_timestamp': self.timestamp, - }) + return json.dumps( + { + 'type': 'request', + 'target': self.target, + 'action': self.action, + 'args': self.args, + 'origin': self.origin if hasattr(self, 'origin') else None, + 'id': self.id if hasattr(self, 'id') else None, + 'token': self.token if hasattr(self, 'token') else None, + '_timestamp': self.timestamp, + } + ) + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/__init__.py b/platypush/plugins/__init__.py index 8338620ec..2d75697f5 100644 --- a/platypush/plugins/__init__.py +++ b/platypush/plugins/__init__.py @@ -19,12 +19,12 @@ def action(f): result = f(*args, **kwargs) if result and isinstance(result, Response): - result.errors = result.errors \ - if isinstance(result.errors, list) else [result.errors] + result.errors = ( + result.errors if isinstance(result.errors, list) else [result.errors] + ) response = result elif isinstance(result, tuple) and len(result) == 2: - response.errors = result[1] \ - if isinstance(result[1], list) else [result[1]] + response.errors = result[1] if isinstance(result[1], list) else [result[1]] if len(response.errors) == 1 and response.errors[0] is None: response.errors = [] @@ -39,12 +39,14 @@ def action(f): return _execute_action -class Plugin(EventGenerator, ExtensionWithManifest): # lgtm [py/missing-call-to-init] - """ Base plugin class """ +class Plugin(EventGenerator, ExtensionWithManifest): # lgtm [py/missing-call-to-init] + """Base plugin class""" def __init__(self, **kwargs): super().__init__() - self.logger = logging.getLogger('platypush:plugin:' + get_plugin_name_by_class(self.__class__)) + self.logger = logging.getLogger( + 'platypush:plugin:' + get_plugin_name_by_class(self.__class__) + ) if 'logging' in kwargs: self.logger.setLevel(getattr(logging, kwargs['logging'].upper())) @@ -53,8 +55,9 @@ class Plugin(EventGenerator, ExtensionWithManifest): # lgtm [py/missing-call-t ) def run(self, method, *args, **kwargs): - assert method in self.registered_actions, '{} is not a registered action on {}'.\ - format(method, self.__class__.__name__) + assert ( + method in self.registered_actions + ), '{} is not a registered action on {}'.format(method, self.__class__.__name__) return getattr(self, method)(*args, **kwargs) @@ -62,6 +65,7 @@ class RunnablePlugin(Plugin): """ Class for runnable plugins - i.e. plugins that have a start/stop method and can be started. """ + def __init__(self, poll_interval: Optional[float] = None, **kwargs): """ :param poll_interval: How often the :meth:`.loop` function should be execute (default: None, no pause/interval). @@ -78,6 +82,9 @@ class RunnablePlugin(Plugin): def should_stop(self): return self._should_stop.is_set() + def wait_stop(self, timeout=None): + return self._should_stop.wait(timeout=timeout) + def start(self): set_thread_name(self.__class__.__name__) self._thread = threading.Thread(target=self._runner) diff --git a/platypush/plugins/matrix/__init__.py b/platypush/plugins/matrix/__init__.py new file mode 100644 index 000000000..42f9fc7dc --- /dev/null +++ b/platypush/plugins/matrix/__init__.py @@ -0,0 +1,488 @@ +import datetime +import json +import multiprocessing +import os +import pathlib +import requests +from abc import ABC, abstractmethod +from urllib.parse import urljoin +from typing import Optional, Collection, Dict, Tuple, Any + +from platypush.config import Config +from platypush.context import get_bus +from platypush.message.event.matrix import ( + MatrixEvent, + MatrixRoomTopicChangeEvent, + MatrixMessageEvent, + MatrixRoomJoinEvent, + MatrixRoomLeaveEvent, + MatrixRoomInviteEvent, + MatrixRoomInviteMeEvent, +) + +from platypush.plugins import RunnablePlugin, action + + +class RetrieveWorker(ABC): + def __init__(self, server_url: str, access_token: str): + self._server_url = server_url + self._access_token = access_token + + @abstractmethod + def _url(self, id: int | str) -> str: + raise NotImplementedError() + + @abstractmethod + def _process_response(self, rs: dict | Collection[dict]) -> dict: + raise NotImplementedError() + + def __call__(self, id: str) -> Tuple[str, dict]: + url = urljoin(self._server_url, self._url(id)) + rs = requests.get( + url, + headers={ + 'Authorization': f'Bearer {self._access_token}', + }, + ) + + rs.raise_for_status() + return (id, self._process_response(rs.json())) + + +class UserRetrieveWorker(RetrieveWorker): + def _url(self, id: str) -> str: + return f'/_matrix/client/r0/profile/{id}' + + def _process_response(self, rs: dict) -> dict: + return { + 'display_name': rs.get('displayname'), + 'avatar_url': rs.get('avatar_url'), + } + + +class RoomRetrieveWorker(RetrieveWorker): + def _url(self, id: str) -> str: + return f'/_matrix/client/v3/rooms/{id}/state' + + def _process_response(self, rs: Collection[dict]) -> dict: + info = {} + for event in rs: + event_type = event.get('type') + if event_type == 'm.room.name': + info['name'] = event.get('content', {}).get('name') + elif event_type == 'm.room.topic': + info['name'] = event.get('content', {}).get('topic') + + return info + + +class MatrixPlugin(RunnablePlugin): + """ + Matrix chat integration. + + Triggers: + + * :class:`platypush.message.event.matrix.MatrixMessageEvent`: when a message is received. + * :class:`platypush.message.event.matrix.MatrixRoomJoinEvent`: when a user joins a room. + * :class:`platypush.message.event.matrix.MatrixRoomLeaveEvent`: when a user leaves a room. + * :class:`platypush.message.event.matrix.MatrixRoomInviteEvent`: when a user (other than the + currently logged one) is invited to a room. + * :class:`platypush.message.event.matrix.MatrixRoomMeInviteEvent`: when the currently logged in + user is invited to a room. + * :class:`platypush.message.event.matrix.MatrixRoomTopicChangeEvent`: when the topic/title of a room changes. + + """ + + def __init__( + self, + server_url: str = 'https://matrix.to', + username: str | None = None, + password: str | None = None, + access_token: str | None = None, + autojoin_on_invite: bool = False, + **kwargs, + ): + """ + Authentication requires either username/password or an access token. + + If you don't want to provide cleartext credentials in the configuration, you can + retrieve an access token offline through the following request:: + + curl -XPOST '{"type":"m.login.password", "user":"username", "password":"password"}' \ + "https://matrix.example.com/_matrix/client/r0/login" + + This may be required if the user or the instance enforce 2FA. + + :param server_url: Default Matrix instance base URL (default: ``https://matrix.to``). + :param username: Default username. Provide either username/password _or_ an access token. + :param password: Default password. Provide either username/password _or_ an access token. + :param access_token: Default access token. Provide either username/password _or_ an access token. + :param autojoin_on_invite: Whether the account should automatically join rooms + upon invite. If false (default value), then you may want to implement your own + logic in an event hook when a :class:`platypush.message.event.matrix.MatrixRoomInviteMeEvent` + event is received, and call the :meth:`.join` method if required. + """ + super().__init__(**kwargs) + self._server_url = server_url + self._user_id = None + self._autojoin_on_invite = autojoin_on_invite + self._workdir = os.path.join(Config.get('workdir'), 'matrix') # type: ignore + pathlib.Path(self._workdir).mkdir(parents=True, exist_ok=True) + + self._sessions_file = os.path.join(self._workdir, 'sessions.json') + self._credentials_file = os.path.join(self._workdir, 'credentials.json') + self._users_cache_file = os.path.join(self._workdir, 'users.json') + self._rooms_cache_file = os.path.join(self._workdir, 'rooms.json') + self._users_cache: Dict[str, dict] = {} + self._rooms_cache: Dict[str, dict] = {} + self._set_credentials(username, password, access_token, overwrite=True) + + def _set_credentials( + self, + username: str | None = None, + password: str | None = None, + access_token: str | None = None, + overwrite: bool = False, + ): + if username or overwrite: + self._username = username + if password or overwrite: + self._password = password + if access_token or overwrite: + self._access_token = access_token + + def _execute(self, url: str, method: str = 'get', **kwargs): + if self._access_token: + kwargs['headers'] = { + 'Authorization': f'Bearer {self._access_token}', + **kwargs.get('headers', {}), + } + + url = urljoin(self._server_url, f'/_matrix/client/{url.lstrip("/")}') + req_method = getattr(requests, method.lower()) + rs = req_method(url, **kwargs) + rs.raise_for_status() + rs = rs.json() + assert not rs.get('error'), rs.get('error') + return rs + + def _save_credentials(self, credentials: dict): + with open(self._credentials_file, 'w') as f: + json.dump(credentials, f) + + def _refresh_user_id(self): + devices = self._execute('/v3/devices').get('devices', []) + assert devices, 'The user is not logged into any devices' + self._user_id = devices[0]['user_id'] + + @action + def login( + self, + server_url: str | None = None, + username: str | None = None, + password: str | None = None, + access_token: str | None = None, + ): + """ + Login to an instance if username/password/access_token were not specified in the plugin + configuration. Otherwise, change the currently logged user or instance. + + :param server_url: New Matrix instance base URL. + :param username: New username. + :param password: New user password. + :param access_token: New access token. + """ + self._server_url = server_url or self._server_url + self._set_credentials(username, password, access_token, overwrite=False) + + if self._access_token: + self._refresh_user_id() + elif self._username and self._password: + rs = self._execute( + '/r0/login', + method='post', + json={ + 'type': 'm.login.password', + 'user': self._username, + 'password': self._password, + 'initial_device_display_name': 'Platypush Matrix integration', + }, + ) + + assert rs.get('access_token'), 'No access token provided by the server' + self._access_token = rs['access_token'] + self._user_id = rs['user_id'] + self._save_credentials(rs) + elif os.path.isfile(self._credentials_file): + with open(self._credentials_file, 'r') as f: + self._access_token = json.load(f)['access_token'] + self._refresh_user_id() + else: + raise AssertionError( + 'No username, password and access token provided nor stored' + ) + + self.logger.info( + f'Successfully logged in to {self._server_url} as {self._user_id}' + ) + + @staticmethod + def _timestamp_to_datetime(t: int | float) -> datetime.datetime: + return datetime.datetime.fromtimestamp(t / 1000) + + def _parse_event( + self, room_id: str, event: dict, users: dict + ) -> Optional[MatrixEvent]: + evt_type = event.get('type') + evt_class = None + room_info = self._rooms_cache.get(room_id, {}) + args: Dict[str, Any] = { + 'server_url': self._server_url, + 'room_id': room_id, + 'room_name': room_info.get('name'), + 'room_topic': room_info.get('topic'), + } + + if event.get('sender') and isinstance(event.get('sender'), str): + cached_user = users.get(event['sender'], {}) + args['sender_id'] = event['sender'] + args['sender_display_name'] = cached_user.get('display_name') + args['sender_avatar_url'] = cached_user.get('avatar_url') + + if event.get('origin_server_ts'): + args['server_timestamp'] = self._timestamp_to_datetime( + event['origin_server_ts'] + ) + + if evt_type == 'm.room.topic': + evt_class = MatrixRoomTopicChangeEvent + args['topic'] = event.get('content', {}).get('topic') # type: ignore + # TODO Handle encrypted rooms events (`m.room.encrypted`) + elif evt_type == 'm.room.message': + evt_class = MatrixMessageEvent + args['body'] = event.get('content', {}).get('body') # type: ignore + elif evt_type == 'm.room.member': + membership = event.get('content', {}).get('membership') + if membership == 'join': + evt_class = MatrixRoomJoinEvent + elif membership == 'invite': + evt_class = MatrixRoomInviteEvent + elif membership == 'leave': + evt_class = MatrixRoomLeaveEvent + + if evt_class: + return evt_class(**args) + + def _parse_invite_event( + self, room_id: str, events: Collection[dict] + ) -> MatrixRoomInviteMeEvent: + evt_args: Dict[str, Any] = { + 'server_url': self._server_url, + 'room_id': room_id, + } + + for event in events: + evt_type = event.get('type') + if evt_type == 'm.room.name': + evt_args['room_name'] = event.get('content', {}).get('name') + elif evt_type == 'm.room.topic': + evt_args['room_topic'] = event.get('content', {}).get('topic') + if event.get('origin_server_ts'): + evt_args['server_timestamp'] = self._timestamp_to_datetime( + event['origin_server_ts'] + ) + + if evt_args.get('room_name'): + self._rooms_cache[room_id] = { + 'room_id': room_id, + 'room_name': evt_args['room_name'], + 'room_topic': evt_args.get('room_topic'), + } + + self._rewrite_rooms_cache() + + return MatrixRoomInviteMeEvent(**evt_args) + + def _retrieve_users_info(self, users: Collection[str]) -> Dict[str, dict]: + users_info = {user: {} for user in users} + retrieve = UserRetrieveWorker(self._server_url, self._access_token or '') + with multiprocessing.Pool(4) as pool: + pool_res = pool.map(retrieve, users_info.keys()) + + return { + user_id: { + 'user_id': user_id, + **info, + } + for user_id, info in pool_res + } + + def _retrieve_rooms_info(self, rooms: Collection[str]) -> Dict[str, dict]: + rooms_info = {room: {} for room in rooms} + retrieve = RoomRetrieveWorker(self._server_url, self._access_token or '') + with multiprocessing.Pool(4) as pool: + pool_res = pool.map(retrieve, rooms_info.keys()) + + return { + room_id: { + 'room_id': room_id, + **info, + } + for room_id, info in pool_res + } + + def _extract_senders(self, rooms) -> Dict[str, dict]: + cache_has_changes = False + senders = set() + + for room in rooms: + room_events = room.get('timeline', {}).get('events', []) + for evt in room_events: + if evt.get('type') == 'm.room.member': + cache_has_changes = True + self._users_cache[evt['sender']] = { + 'user_id': evt['sender'], + 'display_name': evt.get('content', {}).get('displayname'), + 'avatar_url': evt.get('content', {}).get('avatar_url'), + } + + senders.update({evt['sender'] for evt in room_events if evt.get('sender')}) + + missing_senders = {user for user in senders if user not in self._users_cache} + + if missing_senders: + cache_has_changes = True + self._users_cache.update(self._retrieve_users_info(missing_senders)) + + senders_map = { + user: self._users_cache.get(user, {'user_id': user}) for user in senders + } + + if cache_has_changes: + self._rewrite_users_cache() + + return senders_map + + def _extract_rooms(self, rooms: Collection[str]) -> Dict[str, dict]: + missing_rooms_info = { + room_id for room_id in rooms if not self._rooms_cache.get(room_id) + } + + if missing_rooms_info: + self._rooms_cache.update(self._retrieve_rooms_info(missing_rooms_info)) + self._rewrite_rooms_cache() + + return { + room_id: self._rooms_cache.get( + room_id, + { + 'room_id': room_id, + }, + ) + for room_id in rooms + } + + def _process_events(self, events: dict) -> Collection[MatrixEvent]: + rooms = events.get('rooms', {}) + joined_rooms = rooms.get('join', {}) + invited_rooms = rooms.get('invite', {}) + parsed_events = [] + senders = self._extract_senders(joined_rooms.values()) + self._extract_rooms(joined_rooms.keys()) + + # Create joined rooms events + for room_id, room in joined_rooms.items(): + room_events = room.get('timeline', {}).get('events', []) + parsed_room_events = [ + self._parse_event(room_id=room_id, event=event, users=senders) + for event in room_events + ] + + parsed_events.extend([evt for evt in parsed_room_events if evt]) + + # Create invite events + for room_id, room in invited_rooms.items(): + room_events = room.get('invite_state', {}).get('events', []) + parsed_room_event = self._parse_invite_event( + room_id=room_id, events=room_events + ) + parsed_events.append(parsed_room_event) + + if self._autojoin_on_invite: + self.join(room_id) + + parsed_events.sort(key=lambda e: e.server_timestamp) + return parsed_events + + def _reload_users_cache(self): + if os.path.isfile(self._users_cache_file): + with open(self._users_cache_file, 'r') as f: + self._users_cache.update(json.load(f)) + + def _rewrite_users_cache(self): + with open(self._users_cache_file, 'w') as f: + json.dump(self._users_cache, f) + + def _reload_rooms_cache(self): + if os.path.isfile(self._rooms_cache_file): + with open(self._rooms_cache_file, 'r') as f: + self._rooms_cache.update(json.load(f)) + + def _rewrite_rooms_cache(self): + with open(self._rooms_cache_file, 'w') as f: + json.dump(self._rooms_cache, f) + + @action + def sync(self): + """ + Sync the state for the currently logged session. + """ + next_batch = None + sessions = {} + if os.path.isfile(self._sessions_file): + with open(self._sessions_file, 'r') as f: + sessions = json.load(f) + next_batch = sessions.get(self._user_id, {}).get('next_batch') + + if not next_batch: + self.logger.info('Synchronizing Matrix events') + + rs = self._execute('/r0/sync', params={'since': next_batch}) + events = self._process_events(rs) + if events and next_batch: + for event in events: + get_bus().post(event) + + if not sessions.get(self._user_id): + sessions[self._user_id] = {} + + sessions[self._user_id]['next_batch'] = rs.get('next_batch') + with open(self._sessions_file, 'w') as f: + json.dump(sessions, f) + + if not next_batch: + self.logger.info('Matrix events synchronized') + + @action + def join(self, room_id: str): + """ + Join a room by ID. + + :param room_id: Room ID or alias. + """ + self._execute(f'/v3/join/{room_id}', method='post') + self.logger.info('Successfully joined room %s', room_id) + + def main(self): + self.login() + self._reload_users_cache() + self._reload_rooms_cache() + + while not self._should_stop.is_set(): + try: + self.sync() + finally: + self._should_stop.wait(timeout=10) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/matrix/manifest.yaml b/platypush/plugins/matrix/manifest.yaml new file mode 100644 index 000000000..5449b9302 --- /dev/null +++ b/platypush/plugins/matrix/manifest.yaml @@ -0,0 +1,10 @@ +manifest: + events: + platypush.message.event.matrix.MatrixMessageEvent: when a message is received. + platypush.message.event.matrix.MatrixRoomJoinEvent: when a user joins a room. + platypush.message.event.matrix.MatrixRoomLeaveEvent: when a user leaves a room. + platypush.message.event.matrix.MatrixRoomInviteEvent: when a user (other than the currently logged one) is invited to a room. + platypush.message.event.matrix.MatrixRoomMeInviteEvent: when the currently logged in user is invited to a room. + platypush.message.event.matrix.MatrixRoomTopicChangeEvent: when the topic/title of a room changes. + package: platypush.plugins.matrix + type: plugin diff --git a/platypush/plugins/ntfy/__init__.py b/platypush/plugins/ntfy/__init__.py index 064275723..21ad1389c 100644 --- a/platypush/plugins/ntfy/__init__.py +++ b/platypush/plugins/ntfy/__init__.py @@ -121,9 +121,7 @@ class NtfyPlugin(RunnablePlugin): def main(self): if self._subscriptions: self._connect() - - while not self._should_stop.is_set(): - self._should_stop.wait(timeout=1) + self.wait_stop() def stop(self): if self._ws_proc: diff --git a/platypush/schemas/__init__.py b/platypush/schemas/__init__.py index ca2805627..71456ddd9 100644 --- a/platypush/schemas/__init__.py +++ b/platypush/schemas/__init__.py @@ -6,7 +6,7 @@ from dateutil.tz import tzutc from marshmallow import fields -class StrippedString(fields.Function): # lgtm [py/missing-call-to-init] +class StrippedString(fields.Function): # lgtm [py/missing-call-to-init] def __init__(self, *args, **kwargs): kwargs['serialize'] = self._strip kwargs['deserialize'] = self._strip @@ -21,7 +21,15 @@ class StrippedString(fields.Function): # lgtm [py/missing-call-to-init] return value.strip() -class DateTime(fields.Function): # lgtm [py/missing-call-to-init] +class Function(fields.Function): # lgtm [py/missing-call-to-init] + def _get_attr(self, obj, attr: str): + if hasattr(obj, attr): + return getattr(obj, attr) + elif hasattr(obj, 'get'): + return obj.get(attr) + + +class DateTime(Function): # lgtm [py/missing-call-to-init] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.metadata = { @@ -30,7 +38,7 @@ class DateTime(fields.Function): # lgtm [py/missing-call-to-init] } def _serialize(self, value, attr, obj, **kwargs) -> Optional[str]: - value = normalize_datetime(obj.get(attr)) + value = normalize_datetime(self._get_attr(obj, attr)) if value: return value.isoformat() @@ -38,7 +46,7 @@ class DateTime(fields.Function): # lgtm [py/missing-call-to-init] return normalize_datetime(value) -class Date(fields.Function): # lgtm [py/missing-call-to-init] +class Date(Function): # lgtm [py/missing-call-to-init] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.metadata = { @@ -47,7 +55,7 @@ class Date(fields.Function): # lgtm [py/missing-call-to-init] } def _serialize(self, value, attr, obj, **kwargs) -> Optional[str]: - value = normalize_datetime(obj.get(attr)) + value = normalize_datetime(self._get_attr(obj, attr)) if value: return date(value.year, value.month, value.day).isoformat() @@ -56,10 +64,12 @@ class Date(fields.Function): # lgtm [py/missing-call-to-init] return date.fromtimestamp(dt.timestamp()) -def normalize_datetime(dt: Union[str, date, datetime]) -> Optional[Union[date, datetime]]: +def normalize_datetime( + dt: Optional[Union[str, date, datetime]] +) -> Optional[Union[date, datetime]]: if not dt: return - if isinstance(dt, datetime) or isinstance(dt, date): + if isinstance(dt, (datetime, date)): return dt try: