diff --git a/docs/source/conf.py b/docs/source/conf.py index 1acb634f..0f1f9836 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -306,6 +306,7 @@ autodoc_mock_imports = [ 'uvicorn', 'websockets', 'docutils', + 'aioxmpp', ] sys.path.insert(0, os.path.abspath('../..')) diff --git a/docs/source/events.rst b/docs/source/events.rst index 8c4d1306..fe4fe71e 100644 --- a/docs/source/events.rst +++ b/docs/source/events.rst @@ -78,6 +78,7 @@ Events platypush/events/web.widget.rst platypush/events/websocket.rst platypush/events/wiimote.rst + platypush/events/xmpp.rst platypush/events/zeroborg.rst platypush/events/zeroconf.rst platypush/events/zigbee.mqtt.rst diff --git a/docs/source/platypush/events/xmpp.rst b/docs/source/platypush/events/xmpp.rst new file mode 100644 index 00000000..9edcd24b --- /dev/null +++ b/docs/source/platypush/events/xmpp.rst @@ -0,0 +1,5 @@ +``event.xmpp`` +============== + +.. automodule:: platypush.message.event.xmpp + :members: diff --git a/docs/source/platypush/plugins/xmpp.rst b/docs/source/platypush/plugins/xmpp.rst new file mode 100644 index 00000000..6988dbfa --- /dev/null +++ b/docs/source/platypush/plugins/xmpp.rst @@ -0,0 +1,5 @@ +``xmpp`` +======== + +.. automodule:: platypush.plugins.xmpp + :members: diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index d1713426..cadf898e 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -145,6 +145,7 @@ Plugins platypush/plugins/weather.openweathermap.rst platypush/plugins/websocket.rst platypush/plugins/wiimote.rst + platypush/plugins/xmpp.rst platypush/plugins/zeroconf.rst platypush/plugins/zigbee.mqtt.rst platypush/plugins/zwave.rst diff --git a/platypush/message/event/xmpp.py b/platypush/message/event/xmpp.py new file mode 100644 index 00000000..6f549632 --- /dev/null +++ b/platypush/message/event/xmpp.py @@ -0,0 +1,382 @@ +from abc import ABC +from typing import Iterable, Optional, Union + +from platypush.message.event import Event + + +class XmppEvent(Event, ABC): + """ + Base class for XMPP events. + """ + + def __init__(self, *args, client_jabber_id: str, **kwargs): + """ + :param client_jabber_id: The Jabber ID associated to the client connection. + """ + super().__init__(*args, client_jabber_id=client_jabber_id, **kwargs) + + +class XmppUserEvent(XmppEvent, ABC): + """ + Base class for XMPP user events. + """ + + def __init__(self, *args, user_id: str, jid: Optional[str] = None, **kwargs): + """ + :param user_id: User ID. + :param jid: The full Jabber ID of the user, if the visibility of the + full ID including the client identifier is available. + """ + jid = jid or user_id + super().__init__(*args, user_id=user_id, jid=jid, **kwargs) + + +class XmppRoomEvent(XmppEvent, ABC): + """ + Base class for XMPP room events. + """ + + def __init__(self, *args, room_id: str, **kwargs): + """ + :param room_id: Room ID. + """ + super().__init__(*args, room_id=room_id, **kwargs) + + +class XmppConversationEvent(XmppEvent, ABC): + """ + Base class for XMPP p2p conversation events. + """ + + def __init__(self, *args, conversation_id: str, **kwargs): + """ + :param conversation_id: Conversation ID. + """ + super().__init__(*args, conversation_id=conversation_id, **kwargs) + + +class XmppRoomOccupantEvent(XmppRoomEvent, XmppUserEvent, ABC): + """ + Base class for XMPP events about room members. + """ + + def __init__(self, *args, is_self: bool, **kwargs): + """ + :param is_self: True if the event is about the current user. + """ + super().__init__(*args, is_self=is_self, **kwargs) + + +class XmppConversationMemberEvent(XmppConversationEvent, XmppUserEvent, ABC): + """ + Base class for XMPP events about conversation members. + """ + + def __init__(self, *args, is_self: bool, **kwargs): + """ + :param is_self: True if the event is about the current user. + """ + super().__init__(*args, is_self=is_self, **kwargs) + + +class XmppNickChangedEvent(XmppUserEvent, ABC): + """ + Base class for XMPP nick changed events. + """ + + def __init__( + self, *args, old_nick: Optional[str], new_nick: Optional[str], **kwargs + ): + """ + :param old_nick: Old nick. + :param new_nick: New nick. + """ + super().__init__(*args, old_nick=old_nick, new_nick=new_nick, **kwargs) + + +class XmppConnectedEvent(XmppEvent): + """ + Event triggered when the registered XMPP client connects to the server. + """ + + +class XmppDisconnectedEvent(XmppEvent): + """ + Event triggered when the registered XMPP client disconnects from the server. + """ + + def __init__(self, *args, reason: Optional[Union[str, Exception]] = None, **kwargs): + """ + :param reason: The reason of the disconnection. + """ + super().__init__(*args, reason=str(reason) if reason else None, **kwargs) + + +class XmppUserAvailableEvent(XmppUserEvent): + """ + Event triggered when a user the client is subscribed to becomes available. + """ + + +class XmppUserUnavailableEvent(XmppUserEvent): + """ + Event triggered when a user the client is subscribed to becomes unavailable. + """ + + +class XmppRoomUserAvailableEvent(XmppRoomOccupantEvent): + """ + Event triggered when a user in a joined room becomes available. + """ + + +class XmppRoomUserUnavailableEvent(XmppRoomOccupantEvent): + """ + Event triggered when a user in a joined room becomes unavailable. + """ + + +class XmppMessageReceivedEvent(XmppUserEvent): + """ + Event triggered when the registered XMPP client receives a message. + """ + + def __init__(self, *args, body: str, **kwargs): + """ + :param body: The body of the message. + """ + super().__init__(*args, body=body, **kwargs) + + +class XmppRoomMessageReceivedEvent(XmppMessageReceivedEvent, XmppRoomOccupantEvent): + """ + Event triggered when a message is received on a multi-user conversation + joined by the client. + """ + + +class XmppRoomInviteAcceptedEvent(XmppRoomEvent): + """ + Event triggered when an invite to a room is accepted. + """ + + +class XmppRoomInviteRejectedEvent(XmppRoomEvent): + """ + Event triggered when an invite to a room is rejected. + """ + + +class XmppRoomJoinEvent(XmppRoomOccupantEvent): + """ + Event triggered when a user joins a room. + """ + + def __init__(self, *args, members: Optional[Iterable[str]] = None, **kwargs): + """ + :param members: List of IDs of the joined members. + """ + super().__init__(*args, members=list(set(members or [])), **kwargs) + + +class XmppRoomLeaveEvent(XmppRoomOccupantEvent): + """ + Event triggered when a user leaves a room. + """ + + +class XmppRoomEnterEvent(XmppRoomOccupantEvent): + """ + Event triggered when a user first enters a room. + """ + + +class XmppRoomExitEvent(XmppRoomOccupantEvent): + """ + Event triggered when a user exits a room. + """ + + def __init__(self, *args, reason: Optional[str] = None, **kwargs): + """ + :param reason: Exit reason. + """ + super().__init__(*args, reason=reason, **kwargs) + + +class XmppRoomTopicChangedEvent(XmppRoomEvent): + """ + Event triggered when the topic of a room is changed. + """ + + def __init__( + self, + *args, + topic: Optional[str] = None, + changed_by: Optional[str] = None, + **kwargs + ): + """ + :param topic: New room topic. + :param changed_by: Nick of the user who changed the topic. + """ + super().__init__(*args, topic=topic, changed_by=changed_by, **kwargs) + + +class XmppPresenceChangedEvent(XmppUserEvent): + """ + Event triggered when the reported presence of a user in the contacts list + changes. + """ + + def __init__(self, *args, status: Optional[str], **kwargs): + """ + :param status: New presence status. + """ + super().__init__(*args, status=status, **kwargs) + + +class XmppRoomPresenceChangedEvent(XmppPresenceChangedEvent, XmppRoomEvent): + """ + Event triggered when the reported presence of a user in a room changes. + """ + + +class XmppRoomAffiliationChangedEvent(XmppRoomOccupantEvent): + """ + Event triggered when the affiliation of a user in a room changes. + """ + + def __init__( + self, + *args, + affiliation: str, + changed_by: Optional[str] = None, + reason: Optional[str] = None, + **kwargs + ): + """ + :param affiliation: New affiliation. + :param changed_by: Nick of the user who changed the affiliation. + :param reason: Affiliation change reason. + """ + super().__init__( + *args, + affiliation=affiliation, + changed_by=changed_by, + reason=reason, + **kwargs + ) + + +class XmppRoomRoleChangedEvent(XmppRoomOccupantEvent): + """ + Event triggered when the role of a user in a room changes. + """ + + def __init__( + self, + *args, + role: str, + changed_by: Optional[str] = None, + reason: Optional[str] = None, + **kwargs + ): + """ + :param role: New role. + :param changed_by: Nick of the user who changed the role. + :param reason: Role change reason. + """ + super().__init__( + *args, role=role, changed_by=changed_by, reason=reason, **kwargs + ) + + +class XmppRoomNickChangedEvent(XmppNickChangedEvent, XmppRoomOccupantEvent): + """ + Event triggered when a user in a room changes their nick. + """ + + +class XmppRoomInviteEvent(XmppRoomEvent, XmppUserEvent): + """ + Event triggered when the client is invited to join a room. + """ + + def __init__( + self, + *args, + mode: str, + password: Optional[str] = None, + reason: Optional[str] = None, + **kwargs + ): + """ + :param user_id: The user who sent the invite. + :param mode: Invite mode, either ``DIRECT`` or ``MEDIATED``. + :param password: The room password. + :param reason: Optional invite reason. + """ + super().__init__(*args, mode=mode, password=password, reason=reason, **kwargs) + + +class XmppConversationAddedEvent(XmppConversationEvent): + """ + Event triggered when a conversation is added to the client's list. + """ + + def __init__(self, *args, members: Optional[Iterable[str]] = None, **kwargs): + """ + :param members: Jabber IDs of the conversation members. + """ + super().__init__(*args, members=list(set(members or [])), **kwargs) + + +class XmppConversationEnterEvent(XmppConversationEvent): + """ + Event triggered when the user enters a conversation. + """ + + +class XmppConversationExitEvent(XmppConversationEvent): + """ + Event triggered when the user exits a conversation. + """ + + +class XmppConversationNickChangedEvent( + XmppNickChangedEvent, XmppConversationMemberEvent +): + """ + Event triggered when a user in a p2p conversation changes their nick. + """ + + +class XmppConversationJoinEvent(XmppConversationMemberEvent): + """ + Event triggered when a user enters a conversation. + """ + + +class XmppConversationLeaveEvent(XmppConversationMemberEvent): + """ + Event triggered when the user leaves a conversation. + """ + + +class XmppContactAddRequestEvent(XmppUserEvent): + """ + Event triggered when a user adds the client Jabber ID to their contacts + list. + """ + + +class XmppContactAddRequestAcceptedEvent(XmppUserEvent): + """ + Event triggered when a user contact add request is accepted. + """ + + +class XmppContactAddRequestRejectedEvent(XmppUserEvent): + """ + Event triggered when a user contact add request is rejected. + """ diff --git a/platypush/plugins/light/hue/__init__.py b/platypush/plugins/light/hue/__init__.py index 527322f1..deef5466 100644 --- a/platypush/plugins/light/hue/__init__.py +++ b/platypush/plugins/light/hue/__init__.py @@ -896,7 +896,7 @@ class LightHuePlugin(RunnablePlugin, LightEntityManager): :type duration: float :param hue_range: If you selected a ``color_transition``, this will - specify the hue range of your color ``color_transition``. + specify the hue range of your color ``color_transition``. Default: [0, 65535] :type hue_range: list[int] diff --git a/platypush/plugins/xmpp/__init__.py b/platypush/plugins/xmpp/__init__.py new file mode 100644 index 00000000..2d8ecc9d --- /dev/null +++ b/platypush/plugins/xmpp/__init__.py @@ -0,0 +1,732 @@ +import os +from typing import Iterable, Optional, Type, Union +from typing_extensions import override + +import aioxmpp +import aioxmpp.im + +from platypush.config import Config +from platypush.message.event.xmpp import XmppConnectedEvent +from platypush.plugins import AsyncRunnablePlugin, action + +from ._base import XmppBasePlugin +from ._config import XmppConfig +from ._handlers import ( + XmppBaseHandler, + XmppConnectionHandler, + XmppConversationHandler, + XmppHandlersRegistry, + XmppPresenceHandler, + XmppRoomHandler, + XmppRosterHandler, + discover_handlers, +) +from ._mixins import XmppBaseMixin +from ._state import SerializedState, StateSerializer +from ._types import Errors, XmppPresence + + +# pylint: disable=too-many-ancestors +class XmppPlugin(AsyncRunnablePlugin, XmppBasePlugin): + """ + XMPP integration. + + Requires: + + * **aioxmpp** (``pip install aioxmpp``) + * **pytz** (``pip install pytz``) + + Triggers: + + * :class:`platypush.message.event.xmpp.XmppConnectedEvent` + * :class:`platypush.message.event.xmpp.XmppContactAddRequestAcceptedEvent` + * :class:`platypush.message.event.xmpp.XmppContactAddRequestEvent` + * :class:`platypush.message.event.xmpp.XmppContactAddRequestRejectedEvent` + * :class:`platypush.message.event.xmpp.XmppConversationAddedEvent` + * :class:`platypush.message.event.xmpp.XmppConversationEnterEvent` + * :class:`platypush.message.event.xmpp.XmppConversationExitEvent` + * :class:`platypush.message.event.xmpp.XmppConversationJoinEvent` + * :class:`platypush.message.event.xmpp.XmppConversationLeaveEvent` + * :class:`platypush.message.event.xmpp.XmppConversationNickChangedEvent` + * :class:`platypush.message.event.xmpp.XmppDisconnectedEvent` + * :class:`platypush.message.event.xmpp.XmppMessageReceivedEvent` + * :class:`platypush.message.event.xmpp.XmppPresenceChangedEvent` + * :class:`platypush.message.event.xmpp.XmppRoomAffiliationChangedEvent` + * :class:`platypush.message.event.xmpp.XmppRoomEnterEvent` + * :class:`platypush.message.event.xmpp.XmppRoomExitEvent` + * :class:`platypush.message.event.xmpp.XmppRoomInviteAcceptedEvent` + * :class:`platypush.message.event.xmpp.XmppRoomInviteEvent` + * :class:`platypush.message.event.xmpp.XmppRoomInviteRejectedEvent` + * :class:`platypush.message.event.xmpp.XmppRoomJoinEvent` + * :class:`platypush.message.event.xmpp.XmppRoomLeaveEvent` + * :class:`platypush.message.event.xmpp.XmppRoomMessageReceivedEvent` + * :class:`platypush.message.event.xmpp.XmppRoomNickChangedEvent` + * :class:`platypush.message.event.xmpp.XmppRoomPresenceChangedEvent` + * :class:`platypush.message.event.xmpp.XmppRoomRoleChangedEvent` + * :class:`platypush.message.event.xmpp.XmppRoomTopicChangedEvent` + * :class:`platypush.message.event.xmpp.XmppRoomUserAvailableEvent` + * :class:`platypush.message.event.xmpp.XmppRoomUserUnavailableEvent` + * :class:`platypush.message.event.xmpp.XmppUserAvailableEvent` + * :class:`platypush.message.event.xmpp.XmppUserUnavailableEvent` + + """ + + def __init__( + self, + user_id: str, + password: Optional[str] = None, + language: Optional[str] = None, + anonymous: bool = False, + auto_accept_invites: bool = True, + restore_state: bool = True, + state_file: Optional[str] = None, + **kwargs, + ): + """ + :param user_id: Jabber/user ID, in the format ``user@example.org``. + :param password: User password. + :param language: ISO string for the language code that will be used by + the bot (default: ``None``). + :param anonymous: Whether to use anonymous authentication (default: + ``False``). + :param auto_accept_invites: Whether to automatically accept invites to + conversations (default: True). If set to False, and you still want + some control on which invites should be accepted, you can create a + ``hook`` on + :class:`platypush.message.event.xmpp.XmppRoomInviteEvent` that + calls either :meth:`.accept_invite` or :meth:`.reject_invite` with + the ``room_id`` specified on the event, if it is a room event, or + subscribe to + :class:`platypush.message.event.xmpp.XmppContactAddRequestEvent` + and call either :meth:`.accept_invite` or :meth:`.reject_invite` + with the ``user_id`` specified on the event, if it is a contact add + request. + :param restore_state: If ``True`` (default) then any previously joined + conversation or subscribed contact will be joined/subscribed again + when the plugin restarts. Otherwise, upon restart the plugin will + start from a state with no subscriptions nor joined rooms. + :param state_file: Path where the previous state will be stored, if + ``restore_state`` is ``True``. Default: + ``/xmpp/state.json``. + """ + super(XmppBasePlugin, self).__init__(user_id=user_id, language=language) + super(AsyncRunnablePlugin, self).__init__(**kwargs) + + self._security = aioxmpp.make_security_layer(password, anonymous=anonymous) + self._config = XmppConfig( + auto_accept_invites=auto_accept_invites, + restore_state=restore_state, + state_file=os.path.expanduser( + state_file or os.path.join(Config.workdir, 'xmpp', 'state.json') + ), + ) + self._loaded_state = SerializedState() + self._state_serializer = StateSerializer(user_id=self._jid, config=self._config) + self._handlers = XmppHandlersRegistry(self) + self.restore_state() + + def restore_state(self): + """ + Reload the previous state from the configured state file. + """ + if not (self._config.state_file and self._config.restore_state): + return + + self._loaded_state = self._state_serializer.load() + + @property + def _conn_handler(self) -> XmppConnectionHandler: + return self._handlers[XmppConnectionHandler] + + @property + def _conv_handler(self) -> XmppConversationHandler: + return self._handlers[XmppConversationHandler] + + @property + def _presence_handler(self) -> XmppPresenceHandler: + return self._handlers[XmppPresenceHandler] + + @property + def _room_handler(self) -> XmppRoomHandler: + return self._handlers[XmppRoomHandler] + + @property + def _roster_handler(self) -> XmppRosterHandler: + return self._handlers[XmppRosterHandler] + + def _on_disconnect(self, reason: Optional[Union[str, Exception]] = None): + self._conn_handler.disconnect(reason) + + def _register_handlers(self): + for hndl_type in discover_handlers(): + hndl = self.register_xmpp_handler(hndl_type) + hndl.restore_state() + + def register_xmpp_handler(self, hndl_type: Type[XmppBaseMixin]) -> XmppBaseHandler: + self.logger.debug('Registering handler: %s', hndl_type) + self._handlers[hndl_type] = hndl_type( + user_id=self._jid, + language=self._lang, + config=self._config, + state=self._state, + client=self._client, + loop=self._loop, + state_serializer=self._state_serializer, + loaded_state=self._loaded_state, + ) + + return self._handlers[hndl_type] + + @override + def should_stop(self) -> bool: + return super().should_stop() or self._state.should_stop.is_set() + + @override + def stop(self): + self._state.should_stop.set() + self._stop_state_serializer() + self._stop_client() + self._on_disconnect(reason='Plugin terminated') + super().stop() + + def _stop_state_serializer(self): + if self._state_serializer: + self._state_serializer.flush() + self._state_serializer.wait(self._state_serializer.flush_timeout) + + def _stop_client(self): + if self._client: + self._client.stop() + self._client = None + + @override + async def listen(self): + self._client = aioxmpp.PresenceManagedClient(self._jid, self._security) + + try: + async with self._client.connected(): + self._register_handlers() + self._post_event(XmppConnectedEvent) + await self._state.should_stop.wait() + except Exception as e: + self.logger.warning('XMPP connection error: %s', e) + self.logger.exception(e) + self._on_disconnect(e) + raise e + + @action + def send_message( + self, + body: str, + user_id: Optional[str] = None, + room_id: Optional[str] = None, + language: Optional[str] = None, + ): + """ + Send a message to a target (the Jabber ID of another user or room). + + :param body: Message body. + :param user_id: Jabber ID of the target user. Either user_id or room_id + should be specified. + :param room_id: Jabber ID of the target room. Either user_id or room_id + should be specified. + :param language: Override the default language code. + """ + if room_id: + self._room_handler.send_message( + room_id=room_id, body=body, language=language + ) + elif user_id: + self._conv_handler.send_message( + user_id=user_id, body=body, language=language + ) + else: + raise AssertionError(Errors.USER_ID_OR_ROOM_ID) + + @action + def join( + self, + room_id: str, + nick: Optional[str] = None, + password: Optional[str] = None, + auto_rejoin: bool = True, + timeout: Optional[float] = XmppBaseMixin.DEFAULT_TIMEOUT, + ): + """ + Join a room/conversation. + + :param room_id: The Jabber ID of the conversation to join. + :param nick: The nickname that the bot should use in the room (default: + the nickname specified in the configuration's ``user_id`` + parameter). + :param password: The password of the room (default: None). + :param auto_rejoin: Whether to automatically rejoin the room after + disconnection/kick (default: True). + :param timeout: Room join timeout (default: 20 seconds). Set to null + for no timeout. + """ + nick = nick or self._jid.localpart + self._async_run( + self._room_handler.join, + room_id, + timeout=timeout, + nick=nick, + password=password, + auto_rejoin=auto_rejoin, + ) + + @action + def leave( + self, room_id: str, timeout: Optional[float] = XmppBaseMixin.DEFAULT_TIMEOUT + ): + """ + Leave a room/conversation. + + :param room_id: The Jabber ID of the conversation to leave. + :param timeout: Room leave timeout (default: 20 seconds). Set to null + for no timeout. + """ + self._async_run( + self._room_handler.leave, + room_id, + timeout=timeout, + ) + + @action + def accept_invite( + self, room_id: Optional[str] = None, user_id: Optional[str] = None + ): + """ + Accept a pending invite to a multi-user conversation or a contact add + request. + + :param user_id: The target ``user_id`` if this is a contact add request. + :param room_id: The target ``room_id`` if this is a room invite request. + """ + if room_id: + self._room_handler.accept_invite(room_id) + elif user_id: + self._roster_handler.accept_invite(user_id) + else: + raise AssertionError(Errors.USER_ID_OR_ROOM_ID) + + @action + def reject_invite( + self, room_id: Optional[str] = None, user_id: Optional[str] = None + ): + """ + Reject a pending invite to a multi-user conversation or a contact add + request. + + :param user_id: The target ``user_id`` if this is a contact add request. + :param room_id: The target ``room_id`` if this is a room invite request. + """ + if room_id: + self._room_handler.reject_invite(room_id) + elif user_id: + self._roster_handler.reject_invite(user_id) + else: + raise AssertionError(Errors.USER_ID_OR_ROOM_ID) + + @action + def invite( + self, + room_id: str, + user_id: str, + mode: str = 'direct', + text: Optional[str] = None, + timeout: Optional[float] = XmppBaseMixin.DEFAULT_TIMEOUT, + ): + """ + Invite a user to a room. + + :param room_id: The target room JID. + :param user_id: The JID of the user to invite. + :param timeout: Invite request send timeout (default: 20 seconds). Set + to null for no timeout. + :param mode: Invite mode - can be ``direct`` (default) or ``mediated``. + + - ``direct``: The invitation is sent directly to the invitee, + without going through a service specific to the conversation. + + - ``mediated``: The invitation is sent indirectly through a service + which is providing the conversation. Advantages of using this mode + include most notably that the service can automatically add the + invitee to the list of allowed participants in configurations + where such restrictions exist (or deny the request if the inviter + does not have the permissions to do so). + :param text: Optional text to send with the invitation. + """ + self._async_run( + self._room_handler.invite, + room_id=room_id, + user_id=aioxmpp.JID.fromstr(user_id), + mode=getattr( + aioxmpp.im.InviteMode, mode.upper(), aioxmpp.im.InviteMode.DIRECT + ), + text=text, + timeout=timeout, + ) + + @action + def set_presence(self, presence: Union[str, XmppPresence]): + """ + Set/broadcast a new presence state for the user. + + :param presence: The new presence state. Possible values are: + + - ``available`` + - ``offline`` + - ``away`` + - ``xa`` + - ``chat`` + - ``dnd`` + + """ + pres = XmppPresence(presence.lower()) if isinstance(presence, str) else presence + self._presence_handler.set_presence(pres) + + @action + def set_affiliation( + self, + room_id: str, + user_id: str, + affiliation: str, + reason: Optional[str] = None, + timeout: Optional[float] = XmppBaseMixin.DEFAULT_TIMEOUT, + ): + """ + Change the affiliation of a user to a room. + + :param room_id: The target room JID. + :param user_id: The user JID. + :param affiliation: The affiliation to set. Possible values are: + + - ``owner`` + - ``member`` + - ``none`` + - ``outcast`` + - ``publisher`` + - ``publish-only`` + + :param timeout: Request timeout (default: 20 seconds). Set to null for + no timeout. + :param reason: Optional reason for the change. + """ + self._async_run( + self._room_handler.set_affiliation, + room_id=room_id, + user_id=aioxmpp.JID.fromstr(user_id), + affiliation=affiliation, + reason=reason, + timeout=timeout, + ) + + @action + def set_role( + self, + room_id: str, + user_id: str, + role: str, + reason: Optional[str] = None, + timeout: Optional[float] = XmppBaseMixin.DEFAULT_TIMEOUT, + ): + """ + Change the role of a user in a room. + + :param room_id: The target room JID. + :param user_id: The user JID. + :param role: The role to set. Possible values are: + + - ``none`` + - ``participant`` + - ``visitor`` + - ``moderator`` + + :param timeout: Request timeout (default: 20 seconds). Set to null for + no timeout. + :param reason: Optional reason for the change. + """ + self._async_run( + self._room_handler.set_role, + room_id=room_id, + user_id=aioxmpp.JID.fromstr(user_id), + role=role, + reason=reason, + timeout=timeout, + ) + + @action + def kick( + self, + room_id: str, + user_id: str, + reason: Optional[str] = None, + timeout: Optional[float] = XmppBaseMixin.DEFAULT_TIMEOUT, + ): + """ + Kick a user from a room. + + :param room_id: The target room JID. + :param user_id: The JID of the user to kick. + :param timeout: Request timeout (default: 20 seconds). Set to null for + no timeout. + :param reason: Kick reason. + """ + self._async_run( + self._room_handler.kick, + room_id=room_id, + user_id=aioxmpp.JID.fromstr(user_id), + reason=reason, + timeout=timeout, + ) + + @action + def ban( + self, + room_id: str, + user_id: str, + reason: Optional[str] = None, + timeout: Optional[float] = XmppBaseMixin.DEFAULT_TIMEOUT, + ): + """ + Ban a user from a room. + + :param room_id: The target room JID. + :param user_id: The JID of the user to ban. + :param timeout: Request timeout (default: 20 seconds). Set to null for + no timeout. + :param reason: Ban reason. + """ + self._async_run( + self._room_handler.ban, + room_id=room_id, + user_id=aioxmpp.JID.fromstr(user_id), + reason=reason, + timeout=timeout, + ) + + @action + def set_topic( + self, + room_id: str, + topic: str, + timeout: Optional[float] = XmppBaseMixin.DEFAULT_TIMEOUT, + ): + """ + Set the topic of a room. + + :param room_id: The target room JID. + :param topic: New topic. + :param timeout: Request timeout (default: 20 seconds). Set to null for + no timeout. + """ + self._async_run( + self._room_handler.set_topic, + room_id=room_id, + topic=topic, + timeout=timeout, + ) + + @action + def set_room_configuration( + self, + room_id: str, + name: Optional[bool] = None, + description: Optional[bool] = None, + members_only: Optional[bool] = None, + persistent: Optional[bool] = None, + moderated: Optional[bool] = None, + allow_invites: Optional[bool] = None, + allow_private_messages: Optional[bool] = None, + allow_change_subject: Optional[bool] = None, + enable_logging: Optional[bool] = None, + max_history_fetch: Optional[int] = None, + max_users: Optional[int] = None, + password_protected: Optional[bool] = None, + public: Optional[bool] = None, + room_admins: Optional[Iterable[str]] = None, + room_owners: Optional[Iterable[str]] = None, + password: Optional[str] = None, + language: Optional[str] = None, + timeout: Optional[float] = XmppBaseMixin.DEFAULT_TIMEOUT, + ): + """ + Changes the configuration of a room. + + All the parameters are optional, and only those that have a non-null + value will be set. + + :param room_id: The target room JID. + :param name: New room name. + :param description: New room description. + :param members_only: Whether or not this room is only for members. + :param persistent: Whether or not this room is persistent. + :param moderated: Whether or not this room is moderated. + :param allow_invites: Whether or not this room allows invites. + :param allow_private_messages: Whether or not this room allows private + messages. + :param allow_change_subject: Whether or not this room allows changing + its subject. + :param enable_logging: Whether or not this room has logging enabled. + :param max_history_fetch: Maximum number of past messages to fetch when + joining the room. + :param max_users: Maximum number of users allowed in the room. + :param password_protected: Whether or not this room is password protected. + :param public: Whether or not this room is publicly visible. + :param room_admins: List of room admins, by Jabber ID. + :param room_owners: List of room owners, by Jabber ID. + :param password: If the room is password protected, configure its + password here. + :param language: Language of the room (ISO 2-letter code). + :param timeout: Request timeout (default: 20 seconds). Set to null for + no timeout. + """ + self._async_run( + self._room_handler.set_room_config, + room_id=room_id, + name=name, + description=description, + members_only=members_only, + persistent=persistent, + moderated=moderated, + allow_invites=allow_invites, + allow_private_messages=allow_private_messages, + allow_change_subject=allow_change_subject, + enable_logging=enable_logging, + max_history_fetch=max_history_fetch, + max_users=max_users, + password_protected=password_protected, + public=public, + room_admins=room_admins, + room_owners=room_owners, + password=password, + language=language, + timeout=timeout, + ) + + @action + def set_nick( + self, + room_id: str, + nick: str, + timeout: Optional[float] = XmppBaseMixin.DEFAULT_TIMEOUT, + ): + """ + Set the nick of the user on a specific room. + + :param room_id: The target room JID. + :param nick: New nick. + :param timeout: Request timeout (default: 20 seconds). Set to null for + no timeout. + """ + self._async_run( + self._room_handler.set_nick, + room_id=room_id, + nick=nick, + timeout=timeout, + ) + + @action + def add_user(self, user_id: str): + """ + Add the specified user ID to the roster. + + :param user_id: The Jabber ID of the user to add. + """ + self._roster_handler.add_user(user_id) + + @action + def remove_user(self, user_id: str): + """ + Remove the specified user ID from the roster. + + :param user_id: The Jabber ID of the user to remove. + """ + self._roster_handler.remove_user(user_id) + + @action + def request_voice( + self, room_id: str, timeout: Optional[float] = XmppBaseMixin.DEFAULT_TIMEOUT + ): + """ + Request voice (i.e. participant role) in a room. + + :param room_id: The Jabber ID of the room. + :param timeout: Request timeout (default: 20 seconds). Set to null for + no timeout. + """ + self._async_run( + self._room_handler.request_voice, room_id=room_id, timeout=timeout + ) + + @action + def status(self): + """ + Get the current status of the client. + + :return: + + .. code-block:: python + + { + # List of pending room invites, as Jabber IDs + "room_invites": ["bar@conference.xmpp.example.org"], + + # List of pending user invites, as Jabber IDs + "user_invites": ["ignore-me@example.org"], + + # List of users the client is subscribed to + "users": [ + "buddy@example.org" + ], + + # Map of rooms the client has joined, indexed by room ID + "rooms": { + "tests@conference.xmpp.manganiello.tech": { + "room_id": "foo@conference.xmpp.example.org", + "joined": true, + # Possible values: + # ACTIVE, DISCONNECTED, HISTORY, JOIN_PRESENCE + "state": "ACTIVE", + "nick": "me", + + # Map of room members, indexed by user ID + "members": { + "me@example.org": { + "user_id": "me@example.org", + "nick": "me", + # Possible affiliation values: + # none, member, outcast, owner, publisher, publish-only + "affiliation": "none", + + # Possible role values: + # none, participant, visitor, moderator + "role": "participant", + "is_self": true, + "available": true, + + # Possible state values: + # available, offline, away, xa, chat, dnd + "state": "available" + }, + + "buddy@example.org": { + "user_id": "buddy@example.org", + "nick": "SomeBuddy", + "affiliation": "owner", + "role": "moderator", + "is_self": false, + "available": true, + "state": "away" + } + } + } + } + } + + """ + return self._state.asdict(return_passwords=False) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/xmpp/_base.py b/platypush/plugins/xmpp/_base.py new file mode 100644 index 00000000..192ac208 --- /dev/null +++ b/platypush/plugins/xmpp/_base.py @@ -0,0 +1,14 @@ +from abc import ABC, abstractmethod +from typing import Type + +from ._mixins import XmppAsyncMixin, XmppBaseMixin, XmppConfigMixin, XmppEventStateMixin + + +class XmppBasePlugin(XmppAsyncMixin, XmppConfigMixin, XmppEventStateMixin, ABC): + """ + Base interface for the XMPP plugin. + """ + + @abstractmethod + def register_handler(self, hndl_type: Type[XmppBaseMixin]): + raise NotImplementedError diff --git a/platypush/plugins/xmpp/_config.py b/platypush/plugins/xmpp/_config.py new file mode 100644 index 00000000..6f3f0a28 --- /dev/null +++ b/platypush/plugins/xmpp/_config.py @@ -0,0 +1,25 @@ +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class XmppConfig: + """ + Data class that models the XMPP configuration shared across all submodules. + """ + + auto_accept_invites: bool = True + """ + Whether or not to automatically accept invites to rooms and buddy lists. + """ + + restore_state: bool = True + """ + Whether to restore the previous state of the joined rooms and subscriptions + upon application restart. + """ + + state_file: Optional[str] = None + """ + The path where the state of the client is persisted across sessions. + """ diff --git a/platypush/plugins/xmpp/_handlers/__init__.py b/platypush/plugins/xmpp/_handlers/__init__.py new file mode 100644 index 00000000..6c033325 --- /dev/null +++ b/platypush/plugins/xmpp/_handlers/__init__.py @@ -0,0 +1,26 @@ +from ._base import XmppBaseHandler +from ._connection import XmppConnectionHandler +from ._conversation import XmppConversationHandler +from ._discover import discover_handlers +from ._message import XmppMessageHandler +from ._ping import XmppPingHandler +from ._presence import XmppPresenceHandler +from ._pubsub import XmppPubSubHandler +from ._registry import XmppHandlersRegistry +from ._room import XmppRoomHandler +from ._roster import XmppRosterHandler + + +__all__ = [ + "XmppBaseHandler", + "XmppConnectionHandler", + "XmppConversationHandler", + "XmppHandlersRegistry", + "XmppMessageHandler", + "XmppPingHandler", + "XmppPresenceHandler", + "XmppPubSubHandler", + "XmppRoomHandler", + "XmppRosterHandler", + "discover_handlers", +] diff --git a/platypush/plugins/xmpp/_handlers/_base.py b/platypush/plugins/xmpp/_handlers/_base.py new file mode 100644 index 00000000..53509f7e --- /dev/null +++ b/platypush/plugins/xmpp/_handlers/_base.py @@ -0,0 +1,21 @@ +from abc import ABC, abstractmethod + +import aioxmpp + +from .._mixins import XmppAsyncMixin, XmppConfigMixin, XmppEventStateMixin + + +# pylint: disable=too-few-public-methods +class XmppBaseHandler(XmppAsyncMixin, XmppConfigMixin, XmppEventStateMixin, ABC): + """ + Base class for XMPP handlers. + """ + + _client: aioxmpp.Client + + @abstractmethod + def __init__(self, *args, **kwargs): + """ + To be implemented by the subclasses. + """ + super().__init__(*args, **kwargs) diff --git a/platypush/plugins/xmpp/_handlers/_connection.py b/platypush/plugins/xmpp/_handlers/_connection.py new file mode 100644 index 00000000..11971134 --- /dev/null +++ b/platypush/plugins/xmpp/_handlers/_connection.py @@ -0,0 +1,28 @@ +from typing import Optional, Union + +from platypush.message.event.xmpp import XmppDisconnectedEvent + +from ._base import XmppBaseHandler + + +# pylint: disable=too-many-ancestors +class XmppConnectionHandler(XmppBaseHandler): + """ + Handler for XMPP connection/disconnection events. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._client.on_failure.connect(self._on_disconnect()) # type: ignore + self._client.on_stopped.connect(self._on_disconnect()) # type: ignore + + def _on_disconnect(self, reason: Optional[Union[str, Exception]] = None): + def callback(*_, **__): + if not self._state.disconnect_notified.is_set(): + self._post_event(XmppDisconnectedEvent, reason=reason) + self._state.disconnect_notified.set() + + return callback + + def disconnect(self, reason: Optional[Union[str, Exception]] = None): + self._on_disconnect(reason=reason)() diff --git a/platypush/plugins/xmpp/_handlers/_conversation.py b/platypush/plugins/xmpp/_handlers/_conversation.py new file mode 100644 index 00000000..e9ed69de --- /dev/null +++ b/platypush/plugins/xmpp/_handlers/_conversation.py @@ -0,0 +1,106 @@ +from typing import Optional + +import aioxmpp +import aioxmpp.im.p2p + +from platypush.message.event.xmpp import ( + XmppConversationAddedEvent, + XmppConversationEnterEvent, + XmppConversationExitEvent, + XmppConversationJoinEvent, + XmppConversationLeaveEvent, +) + +from ._base import XmppBaseHandler + + +# pylint: disable=too-many-ancestors +class XmppConversationHandler(XmppBaseHandler): + """ + Handler for XMPP conversation events. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.conversation: aioxmpp.im.ConversationService = self._client.summon( + aioxmpp.im.ConversationService + ) + self.conversation.on_conversation_added.connect(self._on_conversation_added) # type: ignore + + def _on_conversation_added( + self, conversation: aioxmpp.im.p2p.Conversation, *_, **__ + ): + if not isinstance(conversation, aioxmpp.im.p2p.Conversation): + return # Don't add signals to rooms - they'll have their own + + conversation_id = str(conversation.jid) + if not self._state.conversations.get(conversation_id): + self._register_conversation_events(conversation) + self._state.users.add(conversation_id) + self._state.conversations[conversation_id] = conversation + self._post_conversation_event( + XmppConversationAddedEvent, + conversation=conversation, + members=[ + str(m.direct_jid if m.direct_jid else m.conversation_jid) + for m in conversation.members + ], + ) + + def _register_conversation_events(self, conversation: aioxmpp.im.p2p.Conversation): + if not isinstance(conversation, aioxmpp.im.p2p.Conversation): + return # Don't add signals to rooms - they'll have their own + + conversation.on_enter.connect(self._on_conversation_enter(conversation)) # type: ignore + conversation.on_exit.connect(self._on_conversation_exit(conversation)) # type: ignore + conversation.on_join.connect(self._on_conversation_join(conversation)) # type: ignore + conversation.on_leave.connect(self._on_conversation_leave(conversation)) # type: ignore + + def _on_conversation_enter(self, conversation: aioxmpp.im.p2p.Conversation): + def callback(*_, **__): + self._post_conversation_event(XmppConversationEnterEvent, conversation) + + return callback + + def _on_conversation_exit(self, conversation: aioxmpp.im.p2p.Conversation): + def callback(*_, **__): + self._post_conversation_event(XmppConversationExitEvent, conversation) + + return callback + + def _on_conversation_join(self, conversation: aioxmpp.im.p2p.Conversation): + def callback(member: aioxmpp.im.p2p.Member, *_, **__): + self._post_conversation_member_event( + XmppConversationJoinEvent, conversation=conversation, member=member + ) + + return callback + + def _on_conversation_leave(self, conversation: aioxmpp.im.p2p.Conversation): + def callback(member: aioxmpp.im.p2p.Member, *_, **__): + if member.is_self: + user_id = str(conversation.jid) + # Remove the conversation from the map of active conversations + self._state.conversations.pop(user_id, None) + self._state.users = self._state.users.difference({user_id}) + + self._post_conversation_member_event( + XmppConversationLeaveEvent, + conversation=conversation, + member=member, + ) + + return callback + + def send_message( + self, + user_id: str, + body: str, + language: Optional[str] = None, + ): + lang = language or self._lang + msg = aioxmpp.Message( + type_=aioxmpp.MessageType.CHAT, to=aioxmpp.JID.fromstr(user_id) + ) + msg.body.update({lang: body}) + self._client.enqueue(msg) diff --git a/platypush/plugins/xmpp/_handlers/_discover.py b/platypush/plugins/xmpp/_handlers/_discover.py new file mode 100644 index 00000000..20afea0d --- /dev/null +++ b/platypush/plugins/xmpp/_handlers/_discover.py @@ -0,0 +1,25 @@ +import importlib +import inspect +import os +from typing import List, Type + +import pkgutil + +from ._base import XmppBaseHandler + + +def discover_handlers() -> List[Type[XmppBaseHandler]]: + """ + Discover the handler classes defined in this module. + """ + + base_pkg = '.'.join(__name__.split('.')[:-1]) + base_dir = os.path.dirname(__file__) + return [ + obj + for _, mod_name, _ in pkgutil.walk_packages([base_dir], prefix=base_pkg + '.') + for _, obj in inspect.getmembers(importlib.import_module(mod_name)) + if inspect.isclass(obj) + and not inspect.isabstract(obj) + and issubclass(obj, XmppBaseHandler) + ] diff --git a/platypush/plugins/xmpp/_handlers/_message.py b/platypush/plugins/xmpp/_handlers/_message.py new file mode 100644 index 00000000..ecece945 --- /dev/null +++ b/platypush/plugins/xmpp/_handlers/_message.py @@ -0,0 +1,38 @@ +import aioxmpp +import aioxmpp.dispatcher + +from platypush.message.event.xmpp import ( + XmppMessageReceivedEvent, +) + +from ._base import XmppBaseHandler + + +# pylint: disable=too-many-ancestors,too-few-public-methods +class XmppMessageHandler(XmppBaseHandler): + """ + Handler for XMPP message events. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.dispatcher = self._client.summon( + aioxmpp.dispatcher.SimpleMessageDispatcher + ) + self.dispatcher.register_callback( + aioxmpp.MessageType.CHAT, + None, # from filter + self._on_msg_received, + ) + + def _on_msg_received(self, msg, *_, **__): + if not msg.body: + return + + if msg.error: + self.logger.warning('Error on message from %s: %s', msg.from_, msg.error) + + body = msg.body.lookup([aioxmpp.structs.LanguageRange.fromstr('*')]) + self._post_user_event( + XmppMessageReceivedEvent, user_id=msg.from_, body=body.rstrip() + ) diff --git a/platypush/plugins/xmpp/_handlers/_ping.py b/platypush/plugins/xmpp/_handlers/_ping.py new file mode 100644 index 00000000..fee83fe2 --- /dev/null +++ b/platypush/plugins/xmpp/_handlers/_ping.py @@ -0,0 +1,14 @@ +import aioxmpp + +from ._base import XmppBaseHandler + + +# pylint: disable=too-many-ancestors,too-few-public-methods +class XmppPingHandler(XmppBaseHandler): + """ + Handler for the XMPP ping logic. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._ping: aioxmpp.PingService = self._client.summon(aioxmpp.PingService) diff --git a/platypush/plugins/xmpp/_handlers/_presence.py b/platypush/plugins/xmpp/_handlers/_presence.py new file mode 100644 index 00000000..05fd2681 --- /dev/null +++ b/platypush/plugins/xmpp/_handlers/_presence.py @@ -0,0 +1,98 @@ +from typing import Union + +import aioxmpp + +from platypush.message.event.xmpp import ( + XmppPresenceChangedEvent, + XmppRoomUserAvailableEvent, + XmppRoomUserUnavailableEvent, + XmppUserAvailableEvent, + XmppUserUnavailableEvent, +) + +from .._types import XmppPresence +from ._base import XmppBaseHandler + + +# pylint: disable=too-many-ancestors +class XmppPresenceHandler(XmppBaseHandler): + """ + Handler for XMPP presence events. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.presence_client: aioxmpp.PresenceClient = self._client.summon( + aioxmpp.PresenceClient + ) + self.presence_client.on_changed.connect(self._on_presence_changed) # type: ignore + self.presence_client.on_available.connect(self._on_presence_available) # type: ignore + self.presence_client.on_unavailable.connect(self._on_presence_unavailable) # type: ignore + + self.presence_server: aioxmpp.PresenceServer = self._client.summon( + aioxmpp.PresenceServer + ) + + def _is_room_event(self, jid: aioxmpp.JID) -> bool: + jid_str = str(jid.replace(resource=None)) # type: ignore + return ( + self._state.rooms.get(jid_str) is not None + or jid_str in self._state.pending_rooms + ) + + def _on_presence_changed( + self, + user_id: Union[str, aioxmpp.JID], + presence: aioxmpp.stanza.Presence, + **_, + ): + if isinstance(user_id, aioxmpp.JID) and self._is_room_event(user_id): + return # Rooms will have their own presence changed events + + self._post_user_event( + XmppPresenceChangedEvent, + user_id=user_id, + status=aioxmpp.PresenceShow(presence.show).value + or XmppPresence.AVAILABLE.value, + ) + + def _on_presence_available(self, user_id: aioxmpp.JID, *_, **__): + jid = str(user_id) + if self._is_room_event(user_id): + self._post_user_event( + XmppRoomUserAvailableEvent, + user_id=jid, + room_id=str(user_id.replace(resource=None)), + is_self=jid == str(self._jid), + ) + elif jid in self._state.users: + self._state.users.add(jid) + self._post_user_event(XmppUserAvailableEvent, user_id=user_id) + + def _on_presence_unavailable(self, user_id: aioxmpp.JID, *_, **__): + jid = str(user_id) + if self._is_room_event(user_id): + self._post_user_event( + XmppRoomUserUnavailableEvent, + user_id=jid, + room_id=str(user_id.replace(resource=None)), + is_self=jid == str(self._jid), + ) + elif jid in self._state.users: + self._post_user_event(XmppUserUnavailableEvent, user_id=user_id) + + def set_presence(self, presence: XmppPresence): + available = presence.value != XmppPresence.OFFLINE.value + presence_show = aioxmpp.PresenceShow( + None + if presence.value + in {XmppPresence.AVAILABLE.value, XmppPresence.OFFLINE.value} + else presence.value + ) + + self.presence_server.set_presence( + aioxmpp.PresenceState( + available=available, + show=presence_show, + ) + ) diff --git a/platypush/plugins/xmpp/_handlers/_pubsub.py b/platypush/plugins/xmpp/_handlers/_pubsub.py new file mode 100644 index 00000000..9a72f5ce --- /dev/null +++ b/platypush/plugins/xmpp/_handlers/_pubsub.py @@ -0,0 +1,14 @@ +import aioxmpp + +from ._base import XmppBaseHandler + + +# pylint: disable=too-many-ancestors,too-few-public-methods +class XmppPubSubHandler(XmppBaseHandler): + """ + Handler for XMPP pub/sub events. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.pubsub: aioxmpp.PubSubClient = self._client.summon(aioxmpp.PubSubClient) diff --git a/platypush/plugins/xmpp/_handlers/_registry.py b/platypush/plugins/xmpp/_handlers/_registry.py new file mode 100644 index 00000000..30079895 --- /dev/null +++ b/platypush/plugins/xmpp/_handlers/_registry.py @@ -0,0 +1,20 @@ +from collections import defaultdict +from typing import Type +from typing_extensions import override + +from .._base import XmppBasePlugin +from ._base import XmppBaseHandler + + +class XmppHandlersRegistry(defaultdict): + """ + A registry of the initialized XMPP handlers. + """ + + def __init__(self, plugin: XmppBasePlugin): + super().__init__() + self._plugin = plugin + + @override + def __missing__(self, hndl_type: Type[XmppBaseHandler]) -> XmppBaseHandler: + return self._plugin.register_handler(hndl_type) diff --git a/platypush/plugins/xmpp/_handlers/_room.py b/platypush/plugins/xmpp/_handlers/_room.py new file mode 100644 index 00000000..5858c0c5 --- /dev/null +++ b/platypush/plugins/xmpp/_handlers/_room.py @@ -0,0 +1,549 @@ +import asyncio +from typing import Iterable, Optional, Union +from typing_extensions import override + +import aioxmpp +import aioxmpp.im +import aioxmpp.muc.xso + +from platypush.message.event.xmpp import ( + XmppRoomAffiliationChangedEvent, + XmppRoomInviteEvent, + XmppRoomInviteAcceptedEvent, + XmppRoomInviteRejectedEvent, + XmppRoomEnterEvent, + XmppRoomExitEvent, + XmppRoomJoinEvent, + XmppRoomLeaveEvent, + XmppRoomMessageReceivedEvent, + XmppRoomNickChangedEvent, + XmppRoomPresenceChangedEvent, + XmppRoomRoleChangedEvent, + XmppRoomTopicChangedEvent, +) + +from .._types import Errors, XmppPresence +from ._base import XmppBaseHandler + + +class XmppRoomHandler(XmppBaseHandler): + """ + Handler for XMPP room events. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.muc_client: aioxmpp.MUCClient = self._client.summon(aioxmpp.MUCClient) + self.muc_client.on_muc_invitation.connect(self._on_muc_invitation) # type: ignore + + async def _restore_state(self): + if self._loaded_state.rooms: + await asyncio.gather( + *[ + self.join( + room_id, + nick=room.nick if room.nick else self._jid.localpart, + password=room.password, + ) + for room_id, room in self._loaded_state.rooms.items() + ] + ) + + @override + def restore_state(self): + self._async_run(self._restore_state, wait_result=False) + + def _on_muc_invitation( + self, + _: aioxmpp.stanza.Message, + room_id: aioxmpp.JID, + inviter: aioxmpp.JID, + mode: aioxmpp.im.InviteMode, + password: Optional[str] = None, + reason: Optional[str] = None, + **__, + ): + def join(): + assert self._loop, Errors.LOOP + nick = self._jid.localpart + self._async_run( + self.join, + room_id=jid, + nick=nick, + password=password, + timeout=self.DEFAULT_TIMEOUT, + ) + + self._state.pending_rooms.add(jid) + self._state.room_invites.pop(jid, None) + self._post_event(XmppRoomInviteAcceptedEvent, room_id=jid) + + def reject(): + self._state.room_invites.pop(jid, None) + self._post_event(XmppRoomInviteRejectedEvent, room_id=jid) + + jid = str(room_id) + invite = self._state.room_invites[jid] + self._post_user_event( + XmppRoomInviteEvent, + room_id=jid, + user_id=inviter, + mode=mode.name, + password=password, + reason=reason, + ) + + invite.on_accepted = join + invite.on_rejected = reject + if self._config.auto_accept_invites: + invite.accept() + + def _get_occupant_by_jid( + self, user_id: str, room: aioxmpp.muc.Room + ) -> aioxmpp.muc.service.Occupant: + occupant = next( + iter( + m + for m in room.members + if str(m.conversation_jid) == user_id or str(m.direct_jid) == user_id + ), + None, + ) + + assert occupant, Errors.NO_USER + return occupant + + async def join( + self, + room_id: str, + nick: Optional[str] = None, + password: Optional[str] = None, + auto_rejoin: bool = True, + ): + address = aioxmpp.JID.fromstr(room_id) + room, future = self.muc_client.join( + address, + nick=nick, + password=password, + autorejoin=auto_rejoin, + ) + + await future + await self._register_room(room) + return room + + async def leave(self, room_id: str): + room = self._state.rooms.get(room_id) + assert room, Errors.ROOM_NOT_JOINED + await room.leave() + self._unregister_room(room) + + async def invite( + self, + user_id: aioxmpp.JID, + room_id: str, + mode: aioxmpp.im.InviteMode = aioxmpp.im.InviteMode.DIRECT, + text: Optional[str] = None, + ): + room = self._state.rooms.get(room_id) + assert room, Errors.ROOM_NOT_JOINED + await room.invite(user_id, text=text, mode=mode) + + async def kick( + self, + user_id: aioxmpp.JID, + room_id: str, + reason: Optional[str] = None, + ): + room = self._state.rooms.get(room_id) + assert room, Errors.ROOM_NOT_JOINED + + occupant = self._get_occupant_by_jid(user_id=str(user_id), room=room) + await room.kick(occupant, reason=reason) + + async def ban( + self, + user_id: aioxmpp.JID, + room_id: str, + reason: Optional[str] = None, + ): + room = self._state.rooms.get(room_id) + assert room, Errors.ROOM_NOT_JOINED + + occupant = self._get_occupant_by_jid(user_id=str(user_id), room=room) + await room.ban(occupant, reason=reason) + + async def set_affiliation( + self, + user_id: aioxmpp.JID, + room_id: str, + affiliation: str, + reason: Optional[str] = None, + ): + room = self._state.rooms.get(room_id) + assert room, Errors.ROOM_NOT_JOINED + + occupant = self._get_occupant_by_jid(user_id=str(user_id), room=room) + await room.muc_set_affiliation( + occupant.direct_jid or occupant.conversation_jid, affiliation, reason=reason + ) + + async def set_role( + self, + user_id: aioxmpp.JID, + room_id: str, + role: str, + reason: Optional[str] = None, + ): + room = self._state.rooms.get(room_id) + assert room, Errors.ROOM_NOT_JOINED + + occupant = self._get_occupant_by_jid(user_id=str(user_id), room=room) + await room.muc_set_role(occupant.nick, role, reason=reason) + + async def set_topic(self, room_id: str, topic: str): + room = self._state.rooms.get(room_id) + assert room, Errors.ROOM_NOT_JOINED + await room.set_topic(topic) + + async def set_nick(self, room_id: str, nick: str): + room = self._state.rooms.get(room_id) + assert room, Errors.ROOM_NOT_JOINED + await room.set_nick(nick) + + # pylint: disable=too-many-branches + async def set_room_config( + self, + room_id: str, + name: Optional[bool] = None, + description: Optional[bool] = None, + members_only: Optional[bool] = None, + persistent: Optional[bool] = None, + moderated: Optional[bool] = None, + allow_invites: Optional[bool] = None, + allow_private_messages: Optional[bool] = None, + allow_change_subject: Optional[bool] = None, + enable_logging: Optional[bool] = None, + max_history_fetch: Optional[int] = None, + max_users: Optional[int] = None, + password_protected: Optional[bool] = None, + public: Optional[bool] = None, + room_admins: Optional[Iterable[Union[str, aioxmpp.JID]]] = None, + room_owners: Optional[Iterable[Union[str, aioxmpp.JID]]] = None, + password: Optional[str] = None, + language: Optional[str] = None, + ): + room = self._state.rooms.get(room_id) + assert room, Errors.ROOM_NOT_JOINED + config = await self.muc_client.get_room_config(room.jid) + form = aioxmpp.muc.xso.ConfigurationForm.from_xso(config) + + if members_only is not None: + form.membersonly.value = members_only + if persistent is not None: + form.persistentroom.value = persistent + if moderated is not None: + form.moderatedroom.value = moderated + if description is not None: + form.roomdesc.value = description + if name is not None: + form.roomname.value = name + if allow_invites is not None: + form.allowinvites.value = allow_invites + if allow_private_messages is not None: + form.allowpm.value = allow_private_messages + if allow_change_subject is not None: + form.changesubject.value = allow_change_subject + if enable_logging is not None: + form.enablelogging.value = enable_logging + if max_history_fetch is not None: + form.maxhistoryfetch.value = max_history_fetch + if max_users is not None: + form.maxusers.value = max_users + if password_protected is not None: + form.passwordprotectedroom.value = max_users + if public is not None: + form.publicroom.value = public + if password is not None: + form.roomsecret.value = password + if language is not None: + form.lang.value = language + if room_admins is not None: + form.roomadmins.value = [ + aioxmpp.JID.fromstr(user_id) if isinstance(user_id, str) else user_id + for user_id in room_admins + ] + if room_owners is not None: + form.roomowners.value = [ + aioxmpp.JID.fromstr(user_id) if isinstance(user_id, str) else user_id + for user_id in room_owners + ] + + await self.muc_client.set_room_config(room.jid, form.render_reply()) + + async def _register_room(self, room: aioxmpp.muc.Room): + room_id = str(room.jid) + if not self._state.rooms.get(room_id): + self._register_room_events(room) + if room_id in self._state.pending_rooms: + self._state.pending_rooms.remove(room_id) + + self._state.rooms[room_id] = room + self._post_user_room_event( + XmppRoomJoinEvent, + room=room, + user_id=self._jid, + is_self=True, + members=[ + str(m.direct_jid if m.direct_jid else m.conversation_jid) + for m in room.members + ], + ) + + await self._configure_room_on_join(room) + + async def _configure_room_on_join(self, room: aioxmpp.muc.Room): + # Check if I'm the owner of the room and there's only me here. + # If that's the case, odds are that the room has been newly created. + # Newly created rooms have public_room set to False by default + if len(room.members) != 1: + return + + member = room.members[0] + if not (member.is_self and member.affiliation == "owner"): + return + + config = await self.muc_client.get_room_config(room.jid) + form = aioxmpp.muc.xso.ConfigurationForm.from_xso(config) + + # If it's already a persistent room, then it's probably not a room that + # has just been created + if form.persistentroom.value: + return + + form.publicroom.value = True + form.allowinvites.value = True + await self.muc_client.set_room_config(room.jid, form.render_reply()) + + def _unregister_room(self, room: aioxmpp.muc.Room): + stored_room = self._state.rooms.pop(self._jid_to_str(room.jid), None) + if stored_room: + self._post_user_room_event( + XmppRoomLeaveEvent, + room=room, + user_id=self._jid, + is_self=True, + ) + + def _register_room_events(self, room: aioxmpp.muc.Room): + room.on_enter.connect(self._on_room_enter(room)) # type: ignore + room.on_exit.connect(self._on_room_exit(room)) # type: ignore + room.on_join.connect(self._on_room_join(room)) # type: ignore + room.on_leave.connect(self._on_room_leave(room)) # type: ignore + room.on_message.connect(self._on_msg_received(room)) # type: ignore + room.on_nick_changed.connect(self._on_room_nick_changed(room)) # type: ignore + room.on_presence_changed.connect(self._on_room_presence_changed(room)) # type: ignore + room.on_topic_changed.connect(self._on_room_topic_changed(room)) # type: ignore + room.on_muc_affiliation_changed.connect(self._on_room_muc_affiliation_changed(room)) # type: ignore + room.on_muc_role_changed.connect(self._on_room_muc_role_changed(room)) # type: ignore + + def _on_msg_received(self, room: aioxmpp.muc.Room): + def callback(msg, occupant: aioxmpp.muc.service.Occupant, *_, **__): + if not msg.body: + return + + if msg.error: + self.logger.warning( + 'Error on message from %s: %s', msg.from_, msg.error + ) + + body = msg.body.lookup([aioxmpp.structs.LanguageRange.fromstr('*')]) + self._post_room_occupant_event( + XmppRoomMessageReceivedEvent, + room=room, + occupant=occupant, + body=body.rstrip(), + ) + + return callback + + def _on_room_join(self, room: aioxmpp.muc.Room): + def callback(occupant: aioxmpp.muc.service.Occupant, *_, **__): + self._post_room_occupant_event( + XmppRoomJoinEvent, room=room, occupant=occupant + ) + + return callback + + def _on_room_leave(self, room: aioxmpp.muc.Room): + def callback(occupant: aioxmpp.muc.service.Occupant, *_, **__): + if occupant.is_self: + self._unregister_room(room) + else: + self._post_room_occupant_event( + XmppRoomLeaveEvent, room=room, occupant=occupant + ) + + return callback + + def _on_room_enter(self, room: aioxmpp.muc.Room): + def callback(*args, **__): + if args: + occupant = args[0] + self._post_room_occupant_event( + XmppRoomEnterEvent, room=room, occupant=occupant + ) + else: + self._async_run(self._register_room, room) + self._post_user_room_event( + XmppRoomEnterEvent, + room=room, + user_id=self._jid, + is_self=True, + ) + + return callback + + def _on_room_exit(self, room: aioxmpp.muc.Room): + def callback( + *args, + reason: Optional[str] = None, + **__, + ): + if args: + occupant = args[0] + self._post_room_occupant_event( + XmppRoomExitEvent, room=room, occupant=occupant, reason=reason + ) + else: + self._post_user_room_event( + XmppRoomExitEvent, + room=room, + user_id=self._jid, + is_self=True, + reason=reason, + ) + + return callback + + def _on_room_nick_changed(self, room: aioxmpp.muc.Room): + def callback( + member: aioxmpp.muc.service.Occupant, + old_nick: Optional[str], + new_nick: Optional[str], + *_, + **__, + ): + self._post_room_occupant_event( + XmppRoomNickChangedEvent, + room=room, + occupant=member, + old_nick=old_nick, + new_nick=new_nick, + ) + + return callback + + def _on_room_presence_changed(self, room: aioxmpp.muc.Room): + def callback( + occupant: aioxmpp.muc.service.Occupant, + _, + presence: aioxmpp.stanza.Presence, + **__, + ): + self._post_room_occupant_event( + XmppRoomPresenceChangedEvent, + room=room, + occupant=occupant, + status=aioxmpp.PresenceShow(presence.show).value + or XmppPresence.AVAILABLE.value, + ) + + return callback + + def _on_room_muc_affiliation_changed(self, room: aioxmpp.muc.Room): + def callback( + presence: aioxmpp.stanza.Presence, + *_, + actor: Optional[aioxmpp.muc.xso.UserActor] = None, + reason: Optional[str] = None, + **__, + ): + occupant = self._get_occupant_by_jid(room=room, user_id=str(presence.from_)) + self._post_room_occupant_event( + XmppRoomAffiliationChangedEvent, + room=room, + occupant=occupant, + affiliation=occupant.affiliation, + changed_by=str(actor.jid) if actor else None, + reason=reason, + ) + + return callback + + def _on_room_muc_role_changed(self, room: aioxmpp.muc.Room): + def callback( + presence: aioxmpp.stanza.Presence, + *_, + actor: Optional[aioxmpp.muc.xso.UserActor] = None, + reason: Optional[str] = None, + **__, + ): + occupant = self._get_occupant_by_jid(room=room, user_id=str(presence.from_)) + self._post_room_occupant_event( + XmppRoomRoleChangedEvent, + room=room, + occupant=occupant, + role=occupant.role, + changed_by=str(actor.jid) if actor else None, + reason=reason, + ) + + return callback + + def _on_room_topic_changed(self, room: aioxmpp.muc.Room): + def callback( + member: aioxmpp.muc.service.ServiceMember, + topic_map: aioxmpp.structs.LanguageMap, + **_, + ): + topic = topic_map.lookup([aioxmpp.structs.LanguageRange.fromstr('*')]) + self._post_room_event( + XmppRoomTopicChangedEvent, + room=room, + topic=topic, + changed_by=member.nick, + ) + + return callback + + def send_message( + self, + room_id: str, + body: str, + language: Optional[str] = None, + ): + room = self._state.rooms.get(room_id) + assert room, Errors.ROOM_NOT_JOINED + + target = room.jid + msg_type = aioxmpp.MessageType.GROUPCHAT + lang = language or self._lang + msg = aioxmpp.Message(type_=msg_type, to=target) + msg.body.update({lang: body}) + self._client.enqueue(msg) + + def accept_invite(self, room_id: str): + invite = self._state.room_invites.get(room_id) + assert invite, Errors.NO_INVITE + invite.accept() + + def reject_invite(self, room_id: str): + invite = self._state.room_invites.get(room_id) + assert invite, Errors.NO_INVITE + invite.reject() + + async def request_voice(self, room_id: str): + room = self._state.rooms.get(room_id) + assert room, Errors.ROOM_NOT_JOINED + await room.muc_request_voice() diff --git a/platypush/plugins/xmpp/_handlers/_roster.py b/platypush/plugins/xmpp/_handlers/_roster.py new file mode 100644 index 00000000..1b4498cb --- /dev/null +++ b/platypush/plugins/xmpp/_handlers/_roster.py @@ -0,0 +1,117 @@ +from typing import Union +from typing_extensions import override +import aioxmpp +import aioxmpp.roster.xso + +from platypush.message.event.xmpp import ( + XmppContactAddRequestAcceptedEvent, + XmppContactAddRequestEvent, + XmppContactAddRequestRejectedEvent, +) + +from .._types import Errors +from ._base import XmppBaseHandler + + +# pylint: disable=too-many-ancestors +class XmppRosterHandler(XmppBaseHandler): + """ + Handler for XMPP roster events. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.roster: aioxmpp.roster.RosterClient = self._client.summon( + aioxmpp.roster.RosterClient + ) + self.roster.on_entry_added.connect(self._on_roster_entry_added) # type: ignore + self.roster.on_entry_removed.connect(self._on_roster_entry_removed) # type: ignore + self.roster.on_subscribe.connect(self._on_roster_subscribe) # type: ignore + + @override + def restore_state(self): + if self._loaded_state.users: + for user_id in self._loaded_state.users: + self.add_user(user_id) + + def _on_roster_entry_added(self, item: aioxmpp.roster.Item, *_, **__): + self.add_user(item.jid) + + def _on_roster_entry_removed(self, item: aioxmpp.roster.Item, *_, **__): + self.remove_user(item.jid) + + def _on_roster_subscribe(self, stanza: aioxmpp.stanza.StanzaBase, *_, **__): + def accept(): + self.add_user(jid) + self.roster.approve(stanza.from_) + self._state.user_invites.pop(jid, None) + self._post_user_event(XmppContactAddRequestAcceptedEvent, user_id=jid) + + def reject(): + self._state.user_invites.pop(jid, None) + self._post_user_event(XmppContactAddRequestRejectedEvent, user_id=jid) + + if not (isinstance(stanza, aioxmpp.Presence) and stanza.to == self._jid): + return # Not a contact add request + + jid = str(stanza.from_) + invite = self._state.user_invites[jid] + self._post_user_event(XmppContactAddRequestEvent, user_id=stanza.from_) # type: ignore + invite.on_accepted = accept + invite.on_rejected = reject + + if self._config.auto_accept_invites: + invite.accept() + + def accept_invite(self, user_id: str): + invite = self._state.user_invites.get(user_id) + assert invite, Errors.NO_INVITE + invite.accept() + + def reject_invite(self, user_id: str): + invite = self._state.user_invites.get(user_id) + assert invite, Errors.NO_INVITE + invite.reject() + + @staticmethod + def _get_jid(user_id: Union[str, aioxmpp.JID]) -> aioxmpp.JID: + return ( + user_id + if isinstance(user_id, aioxmpp.JID) + else aioxmpp.JID.fromstr(user_id) + ).replace(resource=None) + + def add_user(self, user_id: Union[str, aioxmpp.JID]): + """ + Subscribe and add a user to the roster. + """ + + async def async_wrapper(*_, **__): + self.roster.subscribe(jid) + await self.roster.set_entry(jid) + + jid = self._get_jid(user_id) + self._async_run(async_wrapper, wait_result=False) + self._state.users.add(str(jid)) + self._state.user_invites.pop(str(jid), None) + + if self._state_serializer: + self._state_serializer.enqueue(self._state) + + def remove_user(self, user_id: Union[str, aioxmpp.JID]): + """ + Remove a user from the roster. + """ + + async def async_wrapper(*_, **__): + self.roster.unsubscribe(jid) + await self.roster.remove_entry(jid) + + jid = self._get_jid(user_id) + self._async_run(async_wrapper, wait_result=False) + self._state.user_invites.pop(str(jid), None) + + if str(jid) in self._state.users: + self._state.users.remove(str(jid)) + if self._state_serializer: + self._state_serializer.enqueue(self._state) diff --git a/platypush/plugins/xmpp/_mixins/__init__.py b/platypush/plugins/xmpp/_mixins/__init__.py new file mode 100644 index 00000000..03df95b9 --- /dev/null +++ b/platypush/plugins/xmpp/_mixins/__init__.py @@ -0,0 +1,16 @@ +from ._async import XmppAsyncMixin +from ._base import XmppBaseMixin +from ._config import XmppConfigMixin +from ._events import XmppEventMixin +from ._state import XmppStateMixin +from ._event_state import XmppEventStateMixin + + +__all__ = [ + "XmppAsyncMixin", + "XmppBaseMixin", + "XmppConfigMixin", + "XmppEventMixin", + "XmppEventStateMixin", + "XmppStateMixin", +] diff --git a/platypush/plugins/xmpp/_mixins/_async.py b/platypush/plugins/xmpp/_mixins/_async.py new file mode 100644 index 00000000..391faf20 --- /dev/null +++ b/platypush/plugins/xmpp/_mixins/_async.py @@ -0,0 +1,55 @@ +from abc import ABC, abstractmethod +import asyncio +import concurrent.futures +from typing import Callable, Coroutine, Optional + +from .._types import Errors +from ._base import XmppBaseMixin + + +# pylint: disable=too-few-public-methods +class XmppAsyncMixin(XmppBaseMixin, ABC): + """ + This mixin provides a common interface for aioxmpp's asyncio interface. + """ + + @abstractmethod + def __init__( + self, *args, loop: Optional[asyncio.AbstractEventLoop] = None, **kwargs + ): + self._loop = loop + super().__init__(*args, **kwargs) + + def _async_run( + self, + coro: Callable[..., Coroutine], + *args, + timeout: Optional[float] = XmppBaseMixin.DEFAULT_TIMEOUT, + wait_result: bool = True, + **kwargs, + ): + """ + Utility method to call an async action from the thread of the parent + action. + """ + assert self._loop, Errors.LOOP + fut = asyncio.run_coroutine_threadsafe(coro(*args, **kwargs), self._loop) + + if wait_result: + err = None + + try: + return fut.result(timeout) + except (TimeoutError, concurrent.futures.TimeoutError) as e: + self.logger.warning( + 'Call to %s timed out after %f seconds', coro, timeout + ) + err = e + except Exception as e: + self.logger.warning('Call to %s failed: %s', coro, e) + self.logger.exception(e) + err = e + finally: + assert not err, str(err) + + return None diff --git a/platypush/plugins/xmpp/_mixins/_base.py b/platypush/plugins/xmpp/_mixins/_base.py new file mode 100644 index 00000000..1727cfd9 --- /dev/null +++ b/platypush/plugins/xmpp/_mixins/_base.py @@ -0,0 +1,49 @@ +from abc import ABC, abstractmethod +from logging import getLogger +from typing import Optional, Union + +import aioxmpp + + +# pylint: disable=too-few-public-methods +class XmppBaseMixin(ABC): + """ + Base mixin for XMPP classes, containing common methods and properties. + """ + + DEFAULT_TIMEOUT = 20 + """Default timeout for async calls.""" + + @abstractmethod + def __init__( + self, + *_, + user_id: Union[str, aioxmpp.JID], + language: Optional[Union[str, aioxmpp.structs.LanguageTag]] = None, + client: Optional[aioxmpp.Client] = None, + **__, + ): + """ + :param user_id: Jabber/user ID, in the format ``user@example.org``. + :param language: ISO string for the language code that will be used by + the bot (default: ``None``). + :param client: The main XMPP client. + """ + self._jid = ( + aioxmpp.JID.fromstr(user_id) if isinstance(user_id, str) else user_id + ) + """The client's registered JID.""" + self._lang = ( + aioxmpp.structs.LanguageTag.fromstr(language) + if language and isinstance(language, str) + else language + ) + """The client's default language.""" + self._client: Optional[aioxmpp.Client] = client + """The main XMPP client.""" + self.logger = getLogger(f'platypush:xmpp:{self.__class__.__name__}') + + @staticmethod + def _jid_to_str(jid: aioxmpp.JID) -> str: + """Convert a JID to a simple string in the format ``localpart@domain``.""" + return f'{jid.localpart}@{jid.domain}' diff --git a/platypush/plugins/xmpp/_mixins/_config.py b/platypush/plugins/xmpp/_mixins/_config.py new file mode 100644 index 00000000..a0a1489c --- /dev/null +++ b/platypush/plugins/xmpp/_mixins/_config.py @@ -0,0 +1,17 @@ +from abc import ABC, abstractmethod +from typing import Optional + +from .._config import XmppConfig +from ._base import XmppBaseMixin + + +# pylint: disable=too-few-public-methods +class XmppConfigMixin(XmppBaseMixin, ABC): + """ + A simple mixin that encapsulates an XMPP configuration object. + """ + + @abstractmethod + def __init__(self, *args, config: Optional[XmppConfig] = None, **kwargs): + self._config = config or XmppConfig() + super().__init__(*args, **kwargs) diff --git a/platypush/plugins/xmpp/_mixins/_event_state.py b/platypush/plugins/xmpp/_mixins/_event_state.py new file mode 100644 index 00000000..29080c13 --- /dev/null +++ b/platypush/plugins/xmpp/_mixins/_event_state.py @@ -0,0 +1,20 @@ +from abc import ABC +from typing_extensions import override + +from ._events import XmppEventMixin +from ._state import XmppStateMixin + + +# pylint: disable=too-few-public-methods +class XmppEventStateMixin(XmppEventMixin, XmppStateMixin, ABC): + """ + A mixin that encapsulates the state of the XMPP clients and it provides the + features to handle events. + """ + + @override + def _post_event(self, *args, **kwargs): + if self._state_serializer: + self._state_serializer.enqueue(self._state) + + return super()._post_event(*args, **kwargs) diff --git a/platypush/plugins/xmpp/_mixins/_events.py b/platypush/plugins/xmpp/_mixins/_events.py new file mode 100644 index 00000000..bdfcf89d --- /dev/null +++ b/platypush/plugins/xmpp/_mixins/_events.py @@ -0,0 +1,124 @@ +from abc import ABC +from typing import Optional, Type, Union + +import aioxmpp +import aioxmpp.im.p2p + +from platypush.context import get_bus +from platypush.message.event.xmpp import XmppEvent + +from ._base import XmppBaseMixin + + +# pylint: disable=too-few-public-methods +class XmppEventMixin(XmppBaseMixin, ABC): + """ + This mixin provides utility methods to post XMPP events. + """ + + def _post_event(self, event_type: Type[XmppEvent], *args, **kwargs): + get_bus().post( + event_type( + *args, + client_jabber_id=self._jid_to_str( + self._client.local_jid if self._client else self._jid + ), + **kwargs, + ) + ) + + def _post_user_event( + self, + event_type: Type[XmppEvent], + user_id: Union[str, aioxmpp.JID], + *args, + **kwargs, + ): + if isinstance(user_id, str): + kwargs['user_id'] = user_id + kwargs['jid'] = user_id + else: + kwargs['user_id'] = self._jid_to_str(user_id) + kwargs['jid'] = str(user_id) + + self._post_event(event_type, *args, **kwargs) + + def _post_room_event( + self, event_type: Type[XmppEvent], room: aioxmpp.muc.Room, *args, **kwargs + ): + self._post_event( + event_type, *args, room_id=self._jid_to_str(room.jid), **kwargs + ) + + def _post_user_room_event( + self, + event_type: Type[XmppEvent], + room: aioxmpp.muc.Room, + user_id: Union[str, aioxmpp.JID], + *args, + **kwargs, + ): + self._post_user_event( + event_type, + *args, + user_id=user_id, + room_id=self._jid_to_str(room.jid), + **kwargs, + ) + + def _post_conversation_event( + self, + event_type: Type[XmppEvent], + conversation: aioxmpp.im.p2p.Conversation, + *args, + **kwargs, + ): + self._post_event( + event_type, *args, conversation_id=str(conversation.jid), **kwargs + ) + + def _post_room_occupant_event( + self, + event_type: Type[XmppEvent], + room: aioxmpp.muc.Room, + occupant: aioxmpp.muc.service.Occupant, + *args, + user_id: Optional[str] = None, + **kwargs, + ): + self._post_user_room_event( + event_type, + *args, + room=room, + user_id=user_id + or ( + occupant.direct_jid + if occupant.direct_jid + else occupant.conversation_jid + ), + is_self=occupant.is_self, + **kwargs, + ) + + def _post_conversation_member_event( + self, + event_type: Type[XmppEvent], + conversation: aioxmpp.im.p2p.Conversation, + member: aioxmpp.im.p2p.Member, + *args, + user_id: Optional[str] = None, + **kwargs, + ): + self._post_conversation_event( + event_type, + *args, + conversation=conversation, + user_id=user_id + or ( + self._jid_to_str(member.direct_jid) + if member.direct_jid + else str(member.conversation_jid) + ), + is_self=member.is_self, + **kwargs, + ) diff --git a/platypush/plugins/xmpp/_mixins/_state.py b/platypush/plugins/xmpp/_mixins/_state.py new file mode 100644 index 00000000..750bdb74 --- /dev/null +++ b/platypush/plugins/xmpp/_mixins/_state.py @@ -0,0 +1,33 @@ +from abc import ABC, abstractmethod +from typing import Optional + +from .._state import SerializedState, StateSerializer, XmppState +from ._base import XmppBaseMixin + + +# pylint: disable=too-few-public-methods +class XmppStateMixin(XmppBaseMixin, ABC): + """ + A simple mixin that encapsulates an XMPP state object. + """ + + @abstractmethod + def __init__( + self, + *args, + state: Optional[XmppState] = None, + loaded_state: Optional[SerializedState] = None, + state_serializer: Optional[StateSerializer] = None, + **kwargs, + ): + self._state = state or XmppState() + self._loaded_state = loaded_state or SerializedState() + self._state_serializer = state_serializer + super().__init__(*args, **kwargs) + + def restore_state(self): + """ + Function called by the plugin once connected to notify that the + component should reload the previous state (optional, to be implemented + by derived classes). + """ diff --git a/platypush/plugins/xmpp/_state/__init__.py b/platypush/plugins/xmpp/_state/__init__.py new file mode 100644 index 00000000..98ab608e --- /dev/null +++ b/platypush/plugins/xmpp/_state/__init__.py @@ -0,0 +1,5 @@ +from ._model import SerializedState, XmppState +from ._serializer import StateSerializer + + +__all__ = ["SerializedState", "StateSerializer", "XmppState"] diff --git a/platypush/plugins/xmpp/_state/_model.py b/platypush/plugins/xmpp/_state/_model.py new file mode 100644 index 00000000..1b4866d3 --- /dev/null +++ b/platypush/plugins/xmpp/_state/_model.py @@ -0,0 +1,166 @@ +from asyncio import Event as AsyncEvent +from collections import defaultdict +from dataclasses import dataclass, field +from threading import Event +from typing import Any, Dict, Iterable, Optional, Set + +import aioxmpp +import aioxmpp.im.p2p + +from .._types import RoomInvite, UserInvite + + +@dataclass +class OccupantState: + """ + Models the state of a room occupant. + """ + + user_id: str + nick: Optional[str] + affiliation: Optional[str] + role: Optional[str] + is_self: bool + available: bool + state: Optional[str] + + @classmethod + def load(cls, occupant_dict: Dict[str, Any]) -> "OccupantState": + return cls( + user_id=occupant_dict["user_id"], + nick=occupant_dict.get("nick"), + affiliation=occupant_dict.get("affiliation"), + role=occupant_dict.get("role"), + is_self=occupant_dict["is_self"], + available=occupant_dict["available"], + state=occupant_dict.get("state"), + ) + + +@dataclass +class RoomState: + """ + Models the state of a room. + """ + + room_id: str + joined: bool + state: Optional[str] + nick: Optional[str] + password: Optional[str] + members: Dict[str, OccupantState] = field(default_factory=dict) + + @classmethod + def load(cls, room_dict: Dict[str, Any]) -> "RoomState": + return cls( + room_id=room_dict["room_id"], + joined=room_dict["joined"], + state=room_dict.get("state"), + nick=room_dict.get("nick"), + password=room_dict.get("password"), + members={ + user_id: OccupantState.load(member) + for user_id, member in room_dict.get("members", {}).items() + }, + ) + + +@dataclass +class SerializedState: + """ + Serialized snapshot of the XMPP state, which can be more easily + serialized/deserialized to JSON. + """ + + users: Iterable[str] = field(default_factory=list) + """List of users on the subscriptions/contacts list.""" + rooms: Dict[str, RoomState] = field(default_factory=dict) + """List of rooms the user has joined.""" + room_invites: Iterable[str] = field(default_factory=list) + """List of room invites, by room_id.""" + user_invites: Iterable[str] = field(default_factory=list) + """List of user invites, by user_id.""" + + @classmethod + def load(cls, state: Dict[str, Any]) -> "SerializedState": + return cls( + users=state.get("users", []), + rooms={ + room_id: RoomState.load(room) + for room_id, room in state.get("rooms", {}).items() + }, + room_invites=state.get("room_invites", []), + user_invites=state.get("user_invites", []), + ) + + +@dataclass +class XmppState: + """ + Models the state of the XMPP client. + """ + + rooms: Dict[str, aioxmpp.muc.service.Room] = field(default_factory=dict) + conversations: Dict[str, aioxmpp.im.p2p.Conversation] = field(default_factory=dict) + users: Set[str] = field(default_factory=set) + room_invites: Dict[str, RoomInvite] = field( + default_factory=lambda: defaultdict(RoomInvite) + ) + user_invites: Dict[str, UserInvite] = field( + default_factory=lambda: defaultdict(UserInvite) + ) + disconnect_notified: Event = field(default_factory=Event) + should_stop: AsyncEvent = field(default_factory=AsyncEvent) + pending_rooms: Set[str] = field(default_factory=set) + """Set of rooms that are currently being joined""" + + @staticmethod + def _occupant_user_id(occupant: aioxmpp.muc.service.Occupant) -> str: + return ( + str(occupant.direct_jid.replace(resource=None)) + if occupant.direct_jid is not None + else str(occupant.conversation_jid) + ) + + def asdict(self, return_passwords=True): + """ + :return: The state of the client as a flat dictionary. + """ + return { + "room_invites": list(self.room_invites.keys()), + "user_invites": list(self.user_invites.keys()), + "users": list({*self.conversations.keys(), *self.users}), + "rooms": { + room_id: { + "room_id": str(room.jid), + "joined": room.muc_joined, + "state": room.muc_state.name, + "nick": room.me.nick if room.me else None, + **({"password": room.muc_password} if return_passwords else {}), + "members": { + self._occupant_user_id(member): { + "user_id": self._occupant_user_id(member), + "nick": member.nick, + "affiliation": member.affiliation, + "role": member.role, + "is_self": member.is_self, + "available": member.presence_state.available, + "state": ( + "available" + if not member.presence_state.show.value + else member.presence_state.show.value + ), + } + for member in room.members + }, + } + for room_id, room in self.rooms.items() + }, + } + + def serialize(self) -> SerializedState: + """ + :return: The JSON-friendly dehydrated representation of the state, which + can be restored across restarts. + """ + return SerializedState(**self.asdict()) diff --git a/platypush/plugins/xmpp/_state/_serializer.py b/platypush/plugins/xmpp/_state/_serializer.py new file mode 100644 index 00000000..2d8067a0 --- /dev/null +++ b/platypush/plugins/xmpp/_state/_serializer.py @@ -0,0 +1,139 @@ +from dataclasses import asdict +import json +from logging import getLogger +import pathlib +from threading import Event, RLock, Timer +from typing import Final, Optional + +from .._mixins import XmppConfigMixin +from ._model import SerializedState, XmppState + + +class StateSerializer(XmppConfigMixin): + """ + Serializes to file the state of the client upon new events through a + timer-based mechanism. + """ + + _DEFAULT_FLUSH_TIMEOUT: Final[float] = 2 + _EMPTY_STATE: Final[SerializedState] = SerializedState() + + def __init__(self, *args, flush_timeout: float = _DEFAULT_FLUSH_TIMEOUT, **kwargs): + """ + :param flush_timeout: How long the scheduler should wait before + flushing the state. + """ + super().__init__(*args, **kwargs) + self.flush_timeout = flush_timeout + self._timer: Optional[Timer] = None + self._state_lock: Final[RLock] = RLock() + self._state: Optional[XmppState] = None + self._flush_scheduled: Final[Event] = Event() + self.logger = getLogger(__name__) + + def _writer_inner(self, filename: str): + if not self._state: + return + + self.logger.debug("Serializing state to file: %s", filename) + pathlib.Path(filename).parent.mkdir(parents=True, exist_ok=True) + with open(filename, "w") as f: + json.dump(asdict(self._state.serialize()), f) + + def _writer(self): + """ + Write the current state to the file. + """ + + state_file = self._config.state_file + if not state_file: + return + + with self._state_lock: + try: + self._writer_inner(state_file) + finally: + self._reset() + + def _reset(self): + """ + Reset the timer state after normal termination, error or cancellation. + """ + self._flush_scheduled.clear() + self._timer = None + + def load(self) -> SerializedState: + """ + :return: The previous state read from the configured state file. + """ + state_file = self._config.state_file + if not (state_file and self._config.restore_state): + return self._EMPTY_STATE + + try: + with open(state_file, "r") as f: + return SerializedState.load(json.load(f)) + except FileNotFoundError: + self.logger.info("No previous state file found at %s", state_file) + return self._EMPTY_STATE + except ValueError: + self.logger.warning( + "Invalid or corrupt state file found at %s, it will be reset", + state_file, + ) + return self._EMPTY_STATE + + def enqueue(self, state: XmppState): + """ + Schedule an update of the stored state. + """ + with self._state_lock: + self._state = state + + if not self.is_pending(): + self.logger.debug( + "Serialization writer scheduled in %f seconds", self.flush_timeout + ) + self._timer = Timer(self.flush_timeout, self._writer) + self._timer.name = "xmpp:StateSerializer" + self._timer.start() + + self._flush_scheduled.set() + + def flush(self): + """ + Flush the state immediately, without waiting for the next schedule. + """ + with self._state_lock: + self._writer() + + def is_pending(self) -> bool: + """ + :return: ``True`` if there is a pending serialization task, ``False`` + otherwise. + """ + return self._timer is not None and self._flush_scheduled.is_set() + + def wait(self, timeout: Optional[float] = None): + """ + If a serialization task is pending or running, wait for it to terminate. + """ + if self._timer and self.is_pending(): + self._timer.join(timeout) + + with self._state_lock: + if self._timer and self.is_pending(): + self.logger.warning( + "The state serialization task did not terminate in time" + ) + + self.cancel() + + def cancel(self): + """ + Cancel the timer, if it is running. + """ + if self._timer: + self._timer.cancel() + + self._reset() diff --git a/platypush/plugins/xmpp/_types/__init__.py b/platypush/plugins/xmpp/_types/__init__.py new file mode 100644 index 00000000..fb60b18d --- /dev/null +++ b/platypush/plugins/xmpp/_types/__init__.py @@ -0,0 +1,12 @@ +from ._errors import Errors +from ._invite import Invite, RoomInvite, UserInvite +from ._presence import XmppPresence + + +__all__ = [ + "Errors", + "Invite", + "RoomInvite", + "UserInvite", + "XmppPresence", +] diff --git a/platypush/plugins/xmpp/_types/_errors.py b/platypush/plugins/xmpp/_types/_errors.py new file mode 100644 index 00000000..ceff8e1d --- /dev/null +++ b/platypush/plugins/xmpp/_types/_errors.py @@ -0,0 +1,17 @@ +from dataclasses import dataclass +from typing import Final + + +@dataclass +class Errors: + """ + A static class to model plugin error messages. + """ + + CLIENT: Final[str] = 'The XMPP client is not connected' + HANDLERS: Final[str] = 'No registered XMPP handlers found' + LOOP: Final[str] = 'The event loop is not running' + NO_INVITE: Final[str] = 'No such conversation invite' + NO_USER: Final[str] = 'No such user' + ROOM_NOT_JOINED: Final[str] = 'The bot has not joined this room' + USER_ID_OR_ROOM_ID: Final[str] = 'You should specify either user_id or room_id' diff --git a/platypush/plugins/xmpp/_types/_invite.py b/platypush/plugins/xmpp/_types/_invite.py new file mode 100644 index 00000000..480ffbee --- /dev/null +++ b/platypush/plugins/xmpp/_types/_invite.py @@ -0,0 +1,73 @@ +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from enum import Enum +from threading import Event +from typing import Any, Callable, Optional, Type + + +class InviteTarget(Enum): + """ + Tells whether the target of an invite is a user or a room. + """ + + USER = 1 + ROOM = 2 + + +@dataclass +class Invite(ABC): + """ + A class that models the parameters of an invite to a conversation. + """ + + accepted: Optional[bool] = None + responded: Event = field(default_factory=Event) + on_accepted: Callable[[], Any] = field(default_factory=lambda: lambda: None) + on_rejected: Callable[[], Any] = field(default_factory=lambda: lambda: None) + + @property + @abstractmethod + def target(self) -> InviteTarget: + raise NotImplementedError + + def accept(self): + self.accepted = True + self.responded.set() + self.on_accepted() + + def reject(self): + self.accepted = False + self.responded.set() + self.on_rejected() + + def wait_response(self, timeout: Optional[float] = None) -> bool: + return self.responded.wait(timeout) + + @classmethod + def by_target(cls, target: InviteTarget) -> Type["Invite"]: + return { + InviteTarget.ROOM: RoomInvite, + InviteTarget.USER: UserInvite, + }[target] + + +@dataclass +class RoomInvite(Invite): + """ + Models an invite to a room. + """ + + @property + def target(self) -> InviteTarget: + return InviteTarget.ROOM + + +@dataclass +class UserInvite(Invite): + """ + Models an invite to a user's contacts list. + """ + + @property + def target(self) -> InviteTarget: + return InviteTarget.USER diff --git a/platypush/plugins/xmpp/_types/_presence.py b/platypush/plugins/xmpp/_types/_presence.py new file mode 100644 index 00000000..ae82d789 --- /dev/null +++ b/platypush/plugins/xmpp/_types/_presence.py @@ -0,0 +1,19 @@ +from enum import Enum + + +class XmppPresence(Enum): + """ + Models the XMPP presence states. + """ + + AVAILABLE = "available" + NONE = AVAILABLE + PLAIN = AVAILABLE + OFFLINE = "offline" + XA = "xa" + EXTENDED_AWAY = XA + AWAY = "away" + CHAT = "chat" + FREE_FOR_CHAT = CHAT + DND = "dnd" + DO_NOT_DISTURB = DND diff --git a/platypush/plugins/xmpp/manifest.yaml b/platypush/plugins/xmpp/manifest.yaml new file mode 100644 index 00000000..fd4a67f6 --- /dev/null +++ b/platypush/plugins/xmpp/manifest.yaml @@ -0,0 +1,36 @@ +manifest: + events: + platypush.message.event.xmpp.XmppConnectedEvent: + platypush.message.event.xmpp.XmppContactAddRequestAcceptedEvent: + platypush.message.event.xmpp.XmppContactAddRequestEvent: + platypush.message.event.xmpp.XmppContactAddRequestRejectedEvent: + platypush.message.event.xmpp.XmppConversationAddedEvent: + platypush.message.event.xmpp.XmppConversationEnterEvent: + platypush.message.event.xmpp.XmppConversationExitEvent: + platypush.message.event.xmpp.XmppConversationJoinEvent: + platypush.message.event.xmpp.XmppConversationLeaveEvent: + platypush.message.event.xmpp.XmppConversationNickChangedEvent: + platypush.message.event.xmpp.XmppDisconnectedEvent: + platypush.message.event.xmpp.XmppMessageReceivedEvent: + platypush.message.event.xmpp.XmppPresenceChangedEvent: + platypush.message.event.xmpp.XmppRoomAffiliationChangedEvent: + platypush.message.event.xmpp.XmppRoomEnterEvent: + platypush.message.event.xmpp.XmppRoomExitEvent: + platypush.message.event.xmpp.XmppRoomInviteAcceptedEvent: + platypush.message.event.xmpp.XmppRoomInviteEvent: + platypush.message.event.xmpp.XmppRoomInviteRejectedEvent: + platypush.message.event.xmpp.XmppRoomJoinEvent: + platypush.message.event.xmpp.XmppRoomLeaveEvent: + platypush.message.event.xmpp.XmppRoomMessageReceivedEvent: + platypush.message.event.xmpp.XmppRoomNickChangedEvent: + platypush.message.event.xmpp.XmppRoomPresenceChangedEvent: + platypush.message.event.xmpp.XmppRoomRoleChangedEvent: + platypush.message.event.xmpp.XmppRoomTopicChangedEvent: + platypush.message.event.xmpp.XmppRoomUserUnavailableEvent: + platypush.message.event.xmpp.XmppUserAvailableEvent: + platypush.message.event.xmpp.XmppUserUnavailableEvent: + pip: + - aioxmpp + - pytz + package: platypush.plugins.xmpp + type: plugin diff --git a/setup.py b/setup.py index 58c94c04..64652ce9 100755 --- a/setup.py +++ b/setup.py @@ -283,5 +283,7 @@ setup( 'irc': ['irc'], # Support for the Matrix integration 'matrix': ['matrix-nio'], + # Support for the XMPP integration + 'xmpp': ['aioxmpp', 'pytz'], }, )