From d7b273434b6343926673b91064605228a3d468db Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sun, 14 Nov 2021 19:43:19 +0100 Subject: [PATCH] [#203] Added IRC integration --- CHANGELOG.md | 4 +- docs/source/conf.py | 6 + docs/source/events.rst | 1 + docs/source/platypush/events/irc.rst | 5 + docs/source/platypush/plugins/chat.irc.rst | 5 + docs/source/plugins.rst | 1 + platypush/message/event/__init__.py | 4 +- platypush/message/event/irc.py | 184 ++++++++ platypush/plugins/chat/irc/__init__.py | 396 ++++++++++++++++++ platypush/plugins/chat/irc/_bot.py | 462 +++++++++++++++++++++ platypush/plugins/chat/irc/manifest.yaml | 25 ++ platypush/plugins/gotify/manifest.yaml | 3 +- platypush/schemas/irc.py | 135 ++++++ setup.py | 2 + 14 files changed, 1230 insertions(+), 3 deletions(-) create mode 100644 docs/source/platypush/events/irc.rst create mode 100644 docs/source/platypush/plugins/chat.irc.rst create mode 100644 platypush/message/event/irc.py create mode 100644 platypush/plugins/chat/irc/__init__.py create mode 100644 platypush/plugins/chat/irc/_bot.py create mode 100644 platypush/plugins/chat/irc/manifest.yaml create mode 100644 platypush/schemas/irc.py diff --git a/CHANGELOG.md b/CHANGELOG.md index c7a3d288a..62be31144 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,10 +7,12 @@ Given the high speed of development in the first phase, changes are being report ### Added -- Added Mastodon integration. +- Added `mastodon` plugin. +- Added `chat.irc` plugin. ### Fixed +- Fixed `switchbot.status` method in case of virtual devices. - Fixed `switchbot.status` method in case of virtual devices. ## [0.22.4] - 2021-10-19 diff --git a/docs/source/conf.py b/docs/source/conf.py index 561691c01..20a3965e0 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -281,6 +281,12 @@ autodoc_mock_imports = ['googlesamples.assistant.grpc.audio_helpers', 'aiohttp', 'watchdog', 'pyngrok', + 'irc', + 'irc.bot', + 'irc.strings', + 'irc.client', + 'irc.connection', + 'irc.events', ] sys.path.insert(0, os.path.abspath('../..')) diff --git a/docs/source/events.rst b/docs/source/events.rst index 28067f3b7..80d3778b3 100644 --- a/docs/source/events.rst +++ b/docs/source/events.rst @@ -32,6 +32,7 @@ Events platypush/events/http.hook.rst platypush/events/http.rss.rst platypush/events/inotify.rst + platypush/events/irc.rst platypush/events/joystick.rst platypush/events/kafka.rst platypush/events/light.rst diff --git a/docs/source/platypush/events/irc.rst b/docs/source/platypush/events/irc.rst new file mode 100644 index 000000000..7eab9d025 --- /dev/null +++ b/docs/source/platypush/events/irc.rst @@ -0,0 +1,5 @@ +``platypush.message.event.irc`` +=============================== + +.. automodule:: platypush.message.event.irc + :members: diff --git a/docs/source/platypush/plugins/chat.irc.rst b/docs/source/platypush/plugins/chat.irc.rst new file mode 100644 index 000000000..fa6196998 --- /dev/null +++ b/docs/source/platypush/plugins/chat.irc.rst @@ -0,0 +1,5 @@ +``chat.irc`` +============ + +.. automodule:: platypush.plugins.chat.irc + :members: diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 99bdc2bdd..6ed7832e8 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -23,6 +23,7 @@ Plugins platypush/plugins/camera.gstreamer.rst platypush/plugins/camera.ir.mlx90640.rst platypush/plugins/camera.pi.rst + platypush/plugins/chat.irc.rst platypush/plugins/chat.telegram.rst platypush/plugins/clipboard.rst platypush/plugins/config.rst diff --git a/platypush/message/event/__init__.py b/platypush/message/event/__init__.py index 51de55cee..71eb2a5ca 100644 --- a/platypush/message/event/__init__.py +++ b/platypush/message/event/__init__.py @@ -39,7 +39,9 @@ class Event(Message): self.disable_web_clients_notification = disable_web_clients_notification for arg, value in self.args.items(): - if arg != 'args': + if arg not in [ + 'id', 'args', 'origin', 'target', 'type', 'timestamp', 'disable_logging' + ] and not arg.startswith('_'): self.__setattr__(arg, value) @classmethod diff --git a/platypush/message/event/irc.py b/platypush/message/event/irc.py new file mode 100644 index 000000000..edaea5b0e --- /dev/null +++ b/platypush/message/event/irc.py @@ -0,0 +1,184 @@ +from abc import ABC +from base64 import b64encode +from typing import Optional + +from platypush.message.event import Event + + +class IRCEvent(Event, ABC): + """ + IRC base event. + """ + def __init__(self, *args, server: Optional[str] = None, port: Optional[int] = None, + alias: Optional[str] = None, channel: Optional[str] = None, **kwargs): + super().__init__(*args, server=server, port=port, alias=alias, channel=channel, **kwargs) + + +class IRCChannelJoinEvent(IRCEvent): + """ + Event triggered upon account channel join. + """ + def __init__(self, *args, nick: str, **kwargs): + super().__init__(*args, nick=nick, **kwargs) + + +class IRCChannelKickEvent(IRCEvent): + """ + Event triggered upon account channel kick. + """ + def __init__(self, *args, target_nick: str, source_nick: Optional[str] = None, **kwargs): + super().__init__(*args, source_nick=source_nick, target_nick=target_nick, **kwargs) + + +class IRCModeEvent(IRCEvent): + """ + Event triggered when the IRC mode of a channel user changes. + """ + def __init__( + self, *args, mode: str, channel: Optional[str] = None, + source: Optional[str] = None, + target_: Optional[str] = None, **kwargs + ): + super().__init__(*args, mode=mode, channel=channel, source=source, target_=target_, **kwargs) + + +class IRCPartEvent(IRCEvent): + """ + Event triggered when an IRC nick parts. + """ + def __init__(self, *args, nick: str, **kwargs): + super().__init__(*args, nick=nick, **kwargs) + + +class IRCQuitEvent(IRCEvent): + """ + Event triggered when an IRC nick quits. + """ + def __init__(self, *args, nick: str, **kwargs): + super().__init__(*args, nick=nick, **kwargs) + + +class IRCNickChangeEvent(IRCEvent): + """ + Event triggered when a IRC nick changes. + """ + def __init__(self, *args, before: str, after: str, **kwargs): + super().__init__(*args, before=before, after=after, **kwargs) + + +class IRCConnectEvent(IRCEvent): + """ + Event triggered upon server connection. + """ + + +class IRCDisconnectEvent(IRCEvent): + """ + Event triggered upon server disconnection. + """ + + +class IRCPrivateMessageEvent(IRCEvent): + """ + Event triggered when a private message is received. + """ + def __init__(self, *args, text: str, nick: str, mentions_me: bool = False, **kwargs): + super().__init__(*args, text=text, nick=nick, mentions_me=mentions_me, **kwargs) + + +class IRCPublicMessageEvent(IRCEvent): + """ + Event triggered when a public message is received. + """ + def __init__(self, *args, text: str, nick: str, mentions_me: bool = False, **kwargs): + super().__init__(*args, text=text, nick=nick, mentions_me=mentions_me, **kwargs) + + +class IRCDCCRequestEvent(IRCEvent): + """ + Event triggered when a DCC connection request is received. + """ + def __init__(self, *args, address: str, port: int, nick: str, **kwargs): + super().__init__(*args, address=address, port=port, nick=nick, **kwargs) + + +class IRCDCCMessageEvent(IRCEvent): + """ + Event triggered when a DCC message is received. + """ + def __init__(self, *args, address: str, body: bytes, **kwargs): + super().__init__( + *args, address=address, body=b64encode(body).decode(), **kwargs + ) + + +class IRCCTCPMessageEvent(IRCEvent): + """ + Event triggered when a CTCP message is received. + """ + def __init__(self, *args, address: str, message: str, **kwargs): + super().__init__(*args, address=address, message=message, **kwargs) + + +class IRCDCCFileRequestEvent(IRCEvent): + """ + Event triggered when a DCC file send request is received. + """ + def __init__( + self, *args, nick: str, address: str, file: str, + port: int, size: Optional[int] = None, **kwargs + ): + super().__init__( + *args, nick=nick, address=address, file=file, port=port, + size=size, **kwargs + ) + + +class IRCDCCFileRecvCompletedEvent(IRCEvent): + """ + Event triggered when a DCC file transfer RECV is completed. + """ + def __init__( + self, *args, address: str, port: int, file: str, + size: Optional[int] = None, **kwargs + ): + super().__init__( + *args, address=address, file=file, + port=port, size=size, **kwargs + ) + + +class IRCDCCFileRecvCancelledEvent(IRCEvent): + """ + Event triggered when a DCC file transfer RECV is cancelled. + """ + def __init__( + self, *args, address: str, port: int, file: str, + error: str, **kwargs + ): + super().__init__( + *args, address=address, file=file, port=port, + error=error, **kwargs + ) + + +class IRCDCCFileSendCompletedEvent(IRCEvent): + """ + Event triggered when a DCC file transfer SEND is completed. + """ + def __init__(self, *args, address: str, port: int, file: str, **kwargs): + super().__init__(*args, address=address, file=file, port=port, **kwargs) + + +class IRCDCCFileSendCancelledEvent(IRCEvent): + """ + Event triggered when a DCC file transfer SEND is cancelled. + """ + def __init__( + self, *args, address: str, port: int, file: str, + error: str, **kwargs + ): + super().__init__( + *args, address=address, file=file, port=port, + error=error, **kwargs + ) diff --git a/platypush/plugins/chat/irc/__init__.py b/platypush/plugins/chat/irc/__init__.py new file mode 100644 index 000000000..e7abdae21 --- /dev/null +++ b/platypush/plugins/chat/irc/__init__.py @@ -0,0 +1,396 @@ +import os +from typing import Sequence, Dict, Tuple, Union, Optional + +from platypush.plugins import RunnablePlugin, action +from platypush.schemas.irc import IRCServerSchema, IRCServerStatusSchema, IRCChannelSchema + +from ._bot import IRCBot +from .. import ChatPlugin + + +class ChatIrcPlugin(RunnablePlugin, ChatPlugin): + """ + IRC integration. + + This plugin allows you to easily create IRC bots with custom logic that reacts to IRC events + and interact with IRC sessions. + + Triggers: + + * :class:`platypush.message.event.irc.IRCChannelJoinEvent` when a user joins a channel. + * :class:`platypush.message.event.irc.IRCChannelKickEvent` when a user is kicked from a channel. + * :class:`platypush.message.event.irc.IRCModeEvent` when a user/channel mode change event occurs. + * :class:`platypush.message.event.irc.IRCPartEvent` when a user parts a channel. + * :class:`platypush.message.event.irc.IRCQuitEvent` when a user quits. + * :class:`platypush.message.event.irc.IRCNickChangeEvent` when a user nick changes. + * :class:`platypush.message.event.irc.IRCConnectEvent` when the bot connects to a server. + * :class:`platypush.message.event.irc.IRCDisconnectEvent` when the bot disconnects from a server. + * :class:`platypush.message.event.irc.IRCPrivateMessageEvent` when a private message is received. + * :class:`platypush.message.event.irc.IRCPublicMessageEvent` when a public message is received. + * :class:`platypush.message.event.irc.IRCDCCRequestEvent` when a DCC connection request is received. + * :class:`platypush.message.event.irc.IRCDCCMessageEvent` when a DCC message is received. + * :class:`platypush.message.event.irc.IRCCTCPMessageEvent` when a CTCP message is received. + * :class:`platypush.message.event.irc.IRCDCCFileRequestEvent` when a DCC file request is received. + * :class:`platypush.message.event.irc.IRCDCCFileRecvCompletedEvent` when a DCC file download is completed. + * :class:`platypush.message.event.irc.IRCDCCFileRecvCancelledEvent` when a DCC file download is cancelled. + * :class:`platypush.message.event.irc.IRCDCCFileSendCompletedEvent` when a DCC file upload is completed. + * :class:`platypush.message.event.irc.IRCDCCFileSendCancelledEvent` when a DCC file upload is cancelled. + + Requires: + + * **irc** (``pip install irc``) + + """ + + def __init__(self, servers: Sequence[dict], **kwargs): + """ + :param servers: List of servers/channels that the bot will automatically connect/join. + """ + super().__init__(**kwargs) + try: + self._bots: Dict[Tuple[str, int], IRCBot] = { + (server_conf['server'], server_conf['port']): IRCBot(**server_conf) + for server_conf in IRCServerSchema().load(servers, many=True) + } + except Exception as e: + self.logger.warning(f'Could not load IRC server configuration: {e}') + self.logger.exception(e) + raise e + + @property + def _bots_by_server(self) -> Dict[str, IRCBot]: + return { + bot.server: bot + for srv, bot in self._bots.items() + } + + @property + def _bots_by_server_and_port(self) -> Dict[Tuple[str, int], IRCBot]: + return { + (bot.server, bot.port): bot + for srv, bot in self._bots.items() + } + + @property + def _bots_by_alias(self) -> Dict[str, IRCBot]: + return { + bot.alias: bot + for srv, bot in self._bots.items() + if bot.alias + } + + def main(self): + self._connect() + self._should_stop.wait() + + def _connect(self): + for srv, bot in self._bots.items(): + self.logger.info(f'Connecting to IRC server {srv}') + bot.start() + + def stop(self): + for srv, bot in self._bots.items(): + self.logger.info(f'Disconnecting from IRC server {srv}') + try: + bot.stop(bot.stop_message or 'Application stopped') + except Exception as e: + self.logger.warning(f'Error while stopping connection to {srv}: {e}') + + super().stop() + + def _get_bot(self, server: Union[str, Tuple[str, int]]) -> IRCBot: + if isinstance(server, (tuple, list, set)): + bot = self._bots_by_server_and_port[tuple(server)] + else: + bot = self._bots_by_alias.get(server, self._bots_by_server.get(server)) + + assert bot, f'Bot connection to {server} not found' + return bot + + @action + def send_file( + self, file: str, server: Union[str, Tuple[str, int]], nick: str, bind_address: Optional[str] = None + ): + """ + Send a file to an IRC user over DCC connection. + Note that passive connections are currently not supported. + + :param file: Path of the file that should be transferred. + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + :param nick: Target IRC nick. + :param bind_address: DCC listen bind address (default: any). + """ + file = os.path.expanduser(file) + assert os.path.isfile(file), f'{file} is not a regular file' + bot = self._get_bot(server) + bot.dcc_file_transfer(file=file, nick=nick, bind_address=bind_address) + + @action + def send_message( + self, text: str, server: Union[str, Tuple[str, int]], target: Union[str, Sequence[str]] + ): + """ + Send a message to a channel or a nick. + + :param text: Message content. + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + :param target: Message target (nick or channel). If it's a list then the message will be sent + to multiple targets. + """ + bot = self._get_bot(server) + method = ( + bot.connection.privmsg if isinstance(target, str) + else bot.connection.privmsg_many + ) + method(target, text) + + @action + def send_notice( + self, text: str, server: Union[str, Tuple[str, int]], target: str + ): + """ + Send a notice to a channel or a nick. + + :param text: Message content. + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + :param target: Message target (nick or channel). + """ + bot = self._get_bot(server) + bot.connection.notice(target, text) + + @action + def servers(self) -> Sequence[dict]: + """ + Get information about the connected servers. + + :return: .. schema:: irc.IRCServerStatusSchema(many=True) + """ + bots = self._bots_by_server.values() + return IRCServerStatusSchema().dump( + { + 'server': bot.server, + 'port': bot.port, + 'alias': bot.alias, + 'real_name': bot.connection.get_server_name(), + 'nickname': bot.connection.get_nickname(), + 'is_connected': bot.connection.is_connected(), + 'connected_channels': bot.channels.keys(), + } + for bot in bots + ) + + @action + def channel(self, server: Union[str, Tuple[str, int]], channel: str) -> dict: + """ + Get information about a connected channel. + + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + :param channel: + :return: .. schema:: irc.IRCChannelSchema + """ + bot = self._get_bot(server) + channel_name = channel + channel = bot.channels.get(channel) + assert channel, f'Not connected to channel {channel}' + return IRCChannelSchema().dump({ + 'is_invite_only': channel.is_invite_only(), + 'is_moderated': channel.is_moderated(), + 'is_protected': channel.is_protected(), + 'is_secret': channel.is_secret(), + 'name': channel_name, + 'modes': channel.modes, + 'opers': list(channel.opers()), + 'owners': channel.owners(), + 'users': list(channel.users()), + 'voiced': list(channel.voiced()), + }) + + @action + def send_ctcp_message( + self, ctcp_type: str, body: str, server: Union[str, Tuple[str, int]], target: str + ): + """ + Send a CTCP message to a target. + + :param ctcp_type: CTCP message type. + :param body: Message content. + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + :param target: Message target. + """ + bot = self._get_bot(server) + bot.connection.ctcp(ctcp_type, target, body) + + @action + def send_ctcp_reply( + self, body: str, server: Union[str, Tuple[str, int]], target: str + ): + """ + Send a CTCP REPLY command. + + :param body: Message content. + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + :param target: Message target. + """ + bot = self._get_bot(server) + bot.connection.ctcp_reply(target, body) + + @action + def disconnect(self, server: Union[str, Tuple[str, int]], message: Optional[str] = None): + """ + Disconnect from a server. + + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + :param message: Disconnect message (default: configured ``stop_message``. + """ + bot = self._get_bot(server) + bot.connection.disconnect(message or bot.stop_message) + + @action + def invite( + self, nick: str, channel: str, server: Union[str, Tuple[str, int]] + ): + """ + Invite a nick to a channel. + + :param nick: Target IRC nick. + :param channel: Target IRC channel. + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + """ + bot = self._get_bot(server) + bot.connection.invite(nick, channel) + + @action + def join(self, channel: str, server: Union[str, Tuple[str, int]]): + """ + Join a channel. + + :param channel: Target IRC channel. + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + """ + bot = self._get_bot(server) + bot.connection.join(channel) + + @action + def kick( + self, nick: str, channel: str, server: Union[str, Tuple[str, int]], reason: Optional[str] = None + ): + """ + Kick a nick from a channel. + + :param nick: Target IRC nick. + :param channel: Target IRC channel. + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + :param reason: Kick reason. + """ + bot = self._get_bot(server) + bot.connection.kick(channel, nick, reason) + + @action + def mode( + self, target: str, command: str, server: Union[str, Tuple[str, int]] + ): + """ + Send a MODE command on the selected target. + + :param target: IRC target. + :param command: Mode command. + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + """ + bot = self._get_bot(server) + bot.connection.mode(target, command) + + @action + def set_nick(self, nick: str, server: Union[str, Tuple[str, int]]): + """ + Set the IRC nick. + + :param nick: New IRC nick. + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + """ + bot = self._get_bot(server) + bot.connection.nick(nick) + + @action + def oper(self, nick: str, password: str, server: Union[str, Tuple[str, int]]): + """ + Send an OPER command. + + :param nick: IRC nick. + :param password: Nick password. + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + """ + bot = self._get_bot(server) + bot.connection.oper(nick, password) + + @action + def part( + self, channel: Union[str, Sequence[str]], server: Union[str, Tuple[str, int]], + message: Optional[str] = None + ): + """ + Parts/exits a channel. + + :param channel: IRC channel (or list of channels). + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + :param message: Optional part message (default: same as the bot's ``stop_message``). + """ + bot = self._get_bot(server) + channels = [channel] if isinstance(channel, str) else channel + bot.connection.part(channels=channels, message=message or bot.stop_message) + + @action + def quit( + self, server: Union[str, Tuple[str, int]], message: Optional[str] = None + ): + """ + Send a QUIT command. + + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + :param message: Optional quit message (default: same as the bot's ``stop_message``). + """ + bot = self._get_bot(server) + bot.connection.quit(message=message or bot.stop_message) + + @action + def send_raw(self, message: str, server: Union[str, Tuple[str, int]]): + """ + Send a raw IRC message to a connected server. + + :param message: IRC message. + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + """ + bot = self._get_bot(server) + bot.connection.send_raw(message) + + @action + def topic(self, channel: str, server: Union[str, Tuple[str, int]], topic: Optional[str] = None) -> str: + """ + Get/set the topic of an IRC channel. + + :param channel: IRC channel. + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + :param topic: If specified, then set the new topic as channel topic. + Otherwise, just return the current channel topic. + """ + bot = self._get_bot(server) + with bot.event_queue('currenttopic') as evt_queue: + bot.connection.topic(channel, topic) + evt = evt_queue.get(block=True, timeout=bot.response_timeout) + return evt.arguments[1] + + @action + def whois(self, target: str, server: Union[str, Tuple[str, int]]): + """ + Send a WHOIS command for a target. + + :param target: IRC target. + :param server: IRC server, identified either by ``alias`` or ``(server, port)`` tuple. + """ + bot = self._get_bot(server) + with bot.event_queue('whoisuser') as evt_queue: + bot.connection.whois([target]) + evt = evt_queue.get(block=True, timeout=bot.response_timeout) + return evt.arguments + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/chat/irc/_bot.py b/platypush/plugins/chat/irc/_bot.py new file mode 100644 index 000000000..ac0dc97c8 --- /dev/null +++ b/platypush/plugins/chat/irc/_bot.py @@ -0,0 +1,462 @@ +import contextlib +import logging +import multiprocessing +import os +import pathlib +import re +import select +import socket +import ssl as _ssl +import struct +from typing import Iterable, Optional, Type, Sequence, Dict, Tuple + +import irc.bot +from irc.client import Connection, Event as _IRCEvent, ServerConnection, DCCConnection, ip_numstr_to_quad, \ + ip_quad_to_numstr +from irc.connection import Factory as ConnectionFactory +from irc.events import codes as irc_event_codes + +from platypush.context import get_bus +from platypush.message.event.irc import IRCEvent, IRCChannelJoinEvent, IRCChannelKickEvent, IRCDisconnectEvent, \ + IRCModeEvent, IRCNickChangeEvent, IRCPartEvent, IRCQuitEvent, IRCConnectEvent, IRCPrivateMessageEvent, \ + IRCPublicMessageEvent, IRCDCCRequestEvent, IRCDCCMessageEvent, IRCDCCFileRequestEvent, IRCCTCPMessageEvent, \ + IRCDCCFileRecvCompletedEvent, IRCDCCFileRecvCancelledEvent, IRCDCCFileSendCompletedEvent, \ + IRCDCCFileSendCancelledEvent + + +class IRCBot(irc.bot.SingleServerIRCBot): + def __init__( + self, server: str, port: int, nickname: str, alias: str, + channels: Iterable[str], + response_timeout: Optional[float], + dcc_file_transfer_timeout: Optional[float], + dcc_accept_timeout: Optional[float], + dcc_downloads_dir: str, realname: Optional[str] = None, + password: Optional[str] = None, ssl: bool = False, + ipv6: bool = False, stop_message: Optional[str] = None, + dcc_ip_whitelist: Optional[Sequence[str]] = None, + dcc_ip_blacklist: Optional[Sequence[str]] = None, + dcc_nick_whitelist: Optional[Sequence[str]] = None, + dcc_nick_blacklist: Optional[Sequence[str]] = None, + dcc_max_connections: Optional[int] = None, + ): + connection_factory = ConnectionFactory() + if ssl: + connection_factory.wrapper = _ssl.wrap_socket + if ipv6: + connection_factory.family = socket.AF_INET6 + + super().__init__( + server_list=[(server, port)], nickname=nickname, + realname=realname, connect_factory=connection_factory + ) + + self.server = server + self.port = port + self.alias = alias + self._password = password + self.channels.update({channel: None for channel in channels}) + self._stop_message = stop_message + self.dcc_ip_whitelist = set(dcc_ip_whitelist or []) + self.dcc_ip_blacklist = set(dcc_ip_blacklist or []) + self.dcc_nick_whitelist = set(dcc_nick_whitelist or []) + self.dcc_nick_blacklist = set(dcc_nick_blacklist or []) + self.dcc_downloads_dir = dcc_downloads_dir + self.response_timeout = response_timeout + self.dcc_file_transfer_timeout = dcc_file_transfer_timeout + self.dcc_accept_timeout = dcc_accept_timeout + self.dcc_max_connections = dcc_max_connections + self._dcc_send_procs: Dict[Tuple[str, int], multiprocessing.Process] = {} + self._dcc_recv_procs: Dict[Tuple[str, int], multiprocessing.Process] = {} + self._dcc_proc_completion_queue = multiprocessing.Queue() + self._dcc_processor: Optional[multiprocessing.Process] = None + self.logger = logging.getLogger(f'irc@{server}') + # Maps -> + self._pending_requests: Dict[str, multiprocessing.Queue] = {} + + @property + def stop_message(self) -> Optional[str]: + return self._stop_message + + def _post_event(self, connection: Connection, output_event_type: Type[IRCEvent], **kwargs): + if isinstance(connection, ServerConnection): + kwargs['server'] = connection.server + kwargs['port'] = connection.port + + kwargs['connected'] = connection.connected + kwargs['alias'] = self.alias + event = output_event_type(**kwargs) + get_bus().post(event) + + def on_join(self, connection: ServerConnection, event: _IRCEvent): + self._post_event(connection, IRCChannelJoinEvent, channel=event.target, nick=event.source.nick) + super()._on_join(connection, event) + + def on_kick(self, connection: ServerConnection, event: _IRCEvent): + self._post_event( + connection, IRCChannelKickEvent, channel=event.target, + source_nick=event.arguments[1], target_nick=event.arguments[0] + ) + + super()._on_kick(connection, event) + + def on_nick(self, connection: ServerConnection, event: _IRCEvent): + self._post_event( + connection, IRCNickChangeEvent, before=event.source.nick, after=event.target.nick + ) + super()._on_nick(connection, event) + + def on_mode(self, connection: ServerConnection, event: _IRCEvent): + self._post_event( + connection, IRCModeEvent, mode=event.arguments[0], + source=event.source.nick, + target_=(event.arguments[1] if len(event.arguments) > 1 else None), + channel=event.target, + ) + + super()._on_mode(connection, event) + + def on_part(self, connection: Connection, event: _IRCEvent): + self._post_event(connection, IRCPartEvent, nick=event.source.nick) + super()._on_part(connection, event) + + def on_quit(self, connection: Connection, event: _IRCEvent): + self._post_event(connection, IRCQuitEvent, nick=event.source.nick) + super()._on_quit(connection, event) + + def on_welcome(self, connection: Connection, *_): + self._post_event(connection, IRCConnectEvent) + for channel in self.channels: + self.logger.info(f'Joining channel {channel}') + self.connection.join(channel) + + @staticmethod + def on_nicknameinuse(connection: ServerConnection, *_): + connection.nick(connection.nickname + '_') + + @staticmethod + def _mentions_me(connection: ServerConnection, text: str) -> bool: + return bool(re.search(fr'(^|\s|@){connection.nickname}(:|\s|!|\?|$)', text)) + + def on_pubmsg(self, connection: ServerConnection, event: _IRCEvent): + self._post_event( + connection, IRCPublicMessageEvent, + text=event.arguments[0], nick=event.source.nick, + channel=event.target, + mentions_me=self._mentions_me(connection, event.arguments[0]) + ) + + def on_privmsg(self, connection: ServerConnection, event: _IRCEvent): + self._post_event( + connection, IRCPrivateMessageEvent, + text=event.arguments[0], nick=event.source.nick, + channel=event.target, + mentions_me=self._mentions_me(connection, event.arguments[0]) + ) + + def on_dccchat(self, connection: DCCConnection, event: _IRCEvent): + if len(event.arguments) != 2: + return + + args = event.arguments[1].split() + if len(args) == 4: + try: + address = ip_numstr_to_quad(args[2]) + port = int(args[3]) + except ValueError: + return + + nick = event.source.nick + if not self._is_dcc_connection_request_allowed(address=address, nick=nick): + self.logger.info(f'Refused DCC connection from address={address} nick={nick}') + connection.disconnect('Unauthorized peer') + return + + self._post_event( + connection, IRCDCCRequestEvent, address=address, port=port, nick=nick + ) + + self.dcc('chat').connect(address, port) + + def _read(self, sock: socket.socket, bufsize: int = 4096) -> bytes: + if self.dcc_file_transfer_timeout: + sock.setblocking(False) + has_data = select.select([sock], [], [], self.dcc_file_transfer_timeout) + if has_data[0]: + return sock.recv(bufsize) + raise TimeoutError(f'Time out ({self.dcc_file_transfer_timeout}s)') + + return sock.recv(bufsize) + + def _dcc_connect_processor(self): + while True: + proc_type, addr, port = self._dcc_proc_completion_queue.get() + conn_map = getattr(self, f'_dcc_{proc_type}_procs', {}) + conn_map.pop((addr, port), None) + + def _process_dcc_recv(self, connection: DCCConnection, filename: str, address: str, port: int, size: int): + pathlib.Path(self.dcc_downloads_dir).mkdir(parents=True, exist_ok=True) + filename = os.path.abspath(os.path.join(self.dcc_downloads_dir, filename)) + assert filename.startswith(self.dcc_downloads_dir), ( + 'Attempt to save a file outside the downloads directory' + ) + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock, \ + open(filename, 'wb') as f: + try: + sock.connect((address, port)) + processed_bytes = 0 + + while processed_bytes < size: + buf = self._read(sock) + processed_bytes += len(buf) + f.write(buf) + sock.send(f'{socket.htonl(processed_bytes)}'.encode()) + + self._post_event( + connection, IRCDCCFileRecvCompletedEvent, address=address, + port=port, file=filename, size=size + ) + except Exception as e: + self.logger.error(f'DCC transfer error from {address}:{port}: {e}') + self.logger.exception(e) + self._post_event( + connection, IRCDCCFileRecvCancelledEvent, address=address, + port=port, file=filename, error=str(e) + ) + finally: + connection.disconnect() + self._dcc_proc_completion_queue.put(('recv', address, port)) + + def _process_dcc_send(self, connection: DCCConnection, filename: str): + try: + dcc_sock, addr = connection.socket.accept() + self._set_accept_timeout(dcc_sock) + self.logger.info(f'Accepted DCC connection from {addr}') + + # Create a new server socket for the file transfer + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv_sock: + srv_sock.bind((connection.localaddress, 0)) + address, transfer_port = srv_sock.getsockname() + dcc_sock.send( + ( + f'CTCP_MESSAGE \x01' + f'DCC SEND {os.path.basename(filename)} ' + f'{ip_quad_to_numstr(address)} {transfer_port} ' + f'{os.path.getsize(filename)}\x01\r\n' + ).encode() + ) + + # Wait for a connection on the file transfer port + self._set_accept_timeout(srv_sock) + srv_sock.listen(1) + client_sock, addr = srv_sock.accept() + self.logger.info(f'DCC file transfer for {filename} accepted on {addr}') + + try: + with open(filename, 'rb') as f: + buf = f.read(4096) + while buf: + client_sock.send(buf) + recv_bytes = struct.unpack('>i', self._read(client_sock))[0] + buf = buf[recv_bytes:] + + if not buf: + buf = f.read(4096) + + self._post_event( + connection, IRCDCCFileSendCompletedEvent, + file=filename, address=address, port=transfer_port + ) + finally: + client_sock.close() + dcc_sock.close() + except Exception as e: + self.logger.error(f'DCC transfer error to {connection.peeraddress}:{connection.peerport}: {e}') + self.logger.exception(e) + self._post_event( + connection, IRCDCCFileSendCancelledEvent, address=connection.peeraddress, + port=connection.peerport, file=filename, error=str(e) + ) + finally: + connection.disconnect() + self._dcc_proc_completion_queue.put(('send', connection.localaddress, connection.localport)) + + def _set_accept_timeout(self, sock: socket.socket): + if self.dcc_accept_timeout: + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.settimeout(self.dcc_accept_timeout) + + def _stop_dcc_connection(self, address: str, port: int): + self.logger.info(f'Stopping DCC connection to {address}:{port}') + proc = self._dcc_recv_procs.pop((address, port), None) + if proc and proc.is_alive(): + proc.terminate() + + def _accept_dcc_file_request(self, connection: DCCConnection, ctcp_msg: str, nick: str): + ctcp_msg = [ + token.strip() + for token in ctcp_msg.split(' ') + if token + ] + + filename = ' '.join(ctcp_msg[2:-3]) + address, port, size = ctcp_msg[-3:] + + if not (filename and address and port): + return + + address = ip_numstr_to_quad(address) + port = int(port) + size = int(size) + + if port == 0: + self.logger.warning('Passive CTCP file transfer is currently not supported') + return + + self._post_event( + connection, IRCDCCFileRequestEvent, + address=address, + port=port, file=filename, + size=size, nick=nick + ) + + # Accept the file request + # (if we're here then the peer is whitelisted/not blacklisted) + self._dcc_recv_procs[(address, port)] = multiprocessing.Process( + target=self._process_dcc_recv, kwargs={ + 'connection': connection, + 'address': address, + 'port': port, + 'filename': filename, + 'size': size, + } + ) + + self._dcc_recv_procs[(address, port)].start() + + def _handle_ctcp_message(self, connection: DCCConnection, event: _IRCEvent): + body = event.arguments[0] + sep_pos = [i for i, ch in enumerate(body) if ch == ord(b'\x01')] + msg = [] + + try: + msg = [ + body[(0 if i == 0 else sep_pos[i - 1] + 1):pos].decode().strip() + for i, pos in enumerate(sep_pos) + ][1:] + except (ValueError, TypeError): + pass + + if not msg: + return + + # Check if this is a DCC file send request + if msg[0].startswith('DCC SEND '): + self._accept_dcc_file_request(connection, msg[0], event.source) + else: + self._post_event( + connection, IRCCTCPMessageEvent, address=event.source, message=msg + ) + + def on_dccmsg(self, connection: DCCConnection, event: _IRCEvent): + ctcp_header = b'CTCP_MESSAGE' + if event.arguments[0][:len(ctcp_header)] == ctcp_header: + return self._handle_ctcp_message(connection, event) + + self._post_event(connection, IRCDCCMessageEvent, address=event.source, body=event.arguments[0]) + + def on_disconnect(self, connection: Connection, event: _IRCEvent): + self._post_event(connection, IRCDisconnectEvent) + # Cache channels for reconnect logic + channels = {ch: None for ch in self.channels.keys()} + super()._on_disconnect(connection, event) + self.channels.update(channels) + + def on_whoisuser(self, _, event: _IRCEvent): + self._process_event_on_queue(event) + + def on_currenttopic(self, _, event: _IRCEvent): + self._process_event_on_queue(event) + + def _process_event_on_queue(self, event: _IRCEvent): + q = self._pending_requests.get(event.type) + if q: + q.put(event) + + def _on_generic_event(self, _, event: _IRCEvent): + self.logger.debug(f'Received raw unhandled IRC event on {self.server}: {event.__dict__}') + + def _is_dcc_connection_request_allowed(self, address: str, nick: str) -> bool: + if self.dcc_ip_whitelist and address not in self.dcc_ip_whitelist: + return False + if self.dcc_ip_blacklist and address in self.dcc_ip_blacklist: + return False + if self.dcc_nick_whitelist and nick not in self.dcc_nick_whitelist: + return False + if self.dcc_nick_blacklist and nick in self.dcc_nick_blacklist: + return False + + if self.dcc_max_connections and len(self._dcc_recv_procs) >= self.dcc_max_connections: + self.logger.warning( + f'Refused new DCC connection: maximum number of concurrent ' + f'connections ({self.dcc_max_connections}) reached' + ) + + return False + + return True + + def dcc_file_transfer(self, file: str, nick: str, bind_address: Optional[str] = None): + conn: DCCConnection = self.dcc('chat') + bind_address = (bind_address, 0) if bind_address else None + conn.listen(bind_address) + conn.passive = False + self.connection.privmsg( + nick, + f'\x01DCC CHAT chat {ip_quad_to_numstr(conn.localaddress)} {conn.localport}\x01' + ) + + send_proc = self._dcc_send_procs[(conn.localaddress, conn.localport)] = multiprocessing.Process( + target=self._process_dcc_send, + kwargs={ + 'connection': conn, + 'filename': file, + } + ) + + send_proc.start() + send_proc.join() + + @contextlib.contextmanager + def event_queue(self, event_type: str) -> multiprocessing.Queue: + q = self._pending_requests[event_type] = multiprocessing.Queue() + try: + yield q + finally: + q.close() + self._pending_requests.pop(event_type, None) + + def start(self): + self._dcc_processor = multiprocessing.Process(target=self._dcc_connect_processor) + self._dcc_processor.start() + + for event_code in irc_event_codes.keys(): + handler = getattr(self, f'on_{event_code}', None) + if not handler: + self.reactor.add_global_handler( + event_code, lambda conn, evt: self._on_generic_event(conn, evt) + ) + + super().start() + + def stop(self, msg: Optional[str] = None): + msg = msg or self.stop_message + for addr, port in list(self._dcc_recv_procs.keys()) + list(self._dcc_send_procs.keys()): + self._stop_dcc_connection(addr, port) + + if self._dcc_processor and self._dcc_processor.is_alive(): + self._dcc_processor.terminate() + self._dcc_processor = None + + super().disconnect(msg=msg) diff --git a/platypush/plugins/chat/irc/manifest.yaml b/platypush/plugins/chat/irc/manifest.yaml new file mode 100644 index 000000000..7bd2c8145 --- /dev/null +++ b/platypush/plugins/chat/irc/manifest.yaml @@ -0,0 +1,25 @@ +manifest: + events: + platypush.message.event.irc.IRCChannelJoinEvent: when a user joins a channel + platypush.message.event.irc.IRCChannelKickEvent: when a user is kicked from a channel + platypush.message.event.irc.IRCModeEvent: when a user/channel mode change event occurs + platypush.message.event.irc.IRCPartEvent: when a user parts a channel + platypush.message.event.irc.IRCQuitEvent: when a user quits + platypush.message.event.irc.IRCNickChangeEvent: when a user nick changes + platypush.message.event.irc.IRCConnectEvent: when the bot connects to a server + platypush.message.event.irc.IRCDisconnectEvent: when the bot disconnects from a server + platypush.message.event.irc.IRCPrivateMessageEvent: when a private message is received + platypush.message.event.irc.IRCPublicMessageEvent: when a public message is received + platypush.message.event.irc.IRCDCCRequestEvent: when a DCC connection request is received + platypush.message.event.irc.IRCDCCMessageEvent: when a DCC message is received + platypush.message.event.irc.IRCCTCPMessageEvent: when a CTCP message is received + platypush.message.event.irc.IRCDCCFileRequestEvent: when a DCC file request is received + platypush.message.event.irc.IRCDCCFileRecvCompletedEvent: when a DCC file download is completed + platypush.message.event.irc.IRCDCCFileRecvCancelledEvent: when a DCC file download is cancelled + platypush.message.event.irc.IRCDCCFileSendCompletedEvent: when a DCC file upload is completed + platypush.message.event.irc.IRCDCCFileSendCancelledEvent: when a DCC file upload is cancelled + package: platypush.plugins.chat.irc + type: plugin + install: + pip: + - irc diff --git a/platypush/plugins/gotify/manifest.yaml b/platypush/plugins/gotify/manifest.yaml index 520e9dd0d..63963da89 100644 --- a/platypush/plugins/gotify/manifest.yaml +++ b/platypush/plugins/gotify/manifest.yaml @@ -1,4 +1,5 @@ manifest: - events: {} + events: + platypush.message.event.gotify.GotifyMessageEvent: when a new message is received. package: platypush.plugins.gotify type: plugin diff --git a/platypush/schemas/irc.py b/platypush/schemas/irc.py new file mode 100644 index 000000000..2c1d27973 --- /dev/null +++ b/platypush/schemas/irc.py @@ -0,0 +1,135 @@ +import os + +from marshmallow import fields +from marshmallow.schema import Schema + +from platypush.schemas import StrippedString + + +class IRCServerSchema(Schema): + server = fields.String( + required=True, + metadata=dict( + description='Server address or hostname', + example='irc.example.org', + ) + ) + + port = fields.Int( + missing=6667, + metadata=dict( + description='IRC server port', + example=6667, + ) + ) + + password = fields.String(allow_none=True, default=None, metadata=dict(example='password')) + nickname = fields.String(required=True, metadata=dict(example='testbot')) + realname = fields.String(allow_none=True, metadata=dict(example='My Real Name')) + alias = StrippedString( + metadata=dict(description='Friendly name for this bot/server connection') + ) + + channels = fields.List( + fields.String(), + missing=list, + metadata=dict( + description='List of channels the bot will connect to', + example=['#channel1', '#channel2', '#channel3'] + ) + ) + + ssl = fields.Boolean(missing=False) + ipv6 = fields.Boolean(missing=False) + stop_message = StrippedString(missing='Application stopped', metadata=dict(description='Quit/die message')) + + dcc_ip_whitelist = fields.List( + fields.String, + missing=list, + metadata=dict( + description='If specified then only DCC connections from the IP addresses on this list will be accepted', + ) + ) + + dcc_ip_blacklist = fields.List( + fields.String, + missing=list, + metadata=dict( + description='If specified then DCC connections from the IP addresses on this list will be rejected', + ) + ) + + dcc_nick_whitelist = fields.List( + fields.String, + missing=list, + metadata=dict( + description='If specified then only DCC connections from the nicknames on this list will be accepted', + ) + ) + + dcc_nick_blacklist = fields.List( + fields.String, + missing=list, + metadata=dict( + description='If specified then DCC connections from the nicknames on this list will be rejected', + ) + ) + + dcc_downloads_dir = fields.String( + missing=os.path.join(os.path.expanduser('~'), 'Downloads'), + metadata=dict(description='DCC file transfers will be downloaded to this folder (default: ~/Downloads)'), + ) + + response_timeout = fields.Number( + missing=30., + metadata=dict( + description='How long we should wait for a response to an IRC request ' + '(default: 30 seconds)', + ) + ) + + dcc_file_transfer_timeout = fields.Number( + missing=30., + metadata=dict( + description='How long we should wait on a pending DCC file transfer with ' + 'no data being transmitted (default: 30 seconds)', + ) + ) + + dcc_accept_timeout = fields.Number( + missing=300., + metadata=dict( + description='How long we should wait on a pending DCC request ' + 'until the user accepts (default: 300 seconds)', + ) + ) + + dcc_max_connections = fields.Int( + missing=10, + metadata=dict( + description='Maximum number of concurrent DCC connections allowed on this bot (default: 10)' + ) + ) + + +class IRCServerStatusSchema(Schema): + server = StrippedString(required=True) + port = fields.Int(required=True) + alias = StrippedString() + real_name = fields.String() + nickname = fields.String() + is_connected = fields.Boolean() + connected_channels = fields.List(fields.String) + + +class IRCChannelSchema(Schema): + name = fields.String(required=True) + modes = fields.List(fields.String) + opers = fields.List(fields.String) + owners = fields.List(fields.String) + users = fields.List(fields.String) + voiced = fields.List(fields.String) + is_invite_only = fields.Boolean() + is_moderated = fields.Boolean() + is_protected = fields.Boolean() + is_secret = fields.Boolean() diff --git a/setup.py b/setup.py index 198099746..35540976f 100755 --- a/setup.py +++ b/setup.py @@ -252,5 +252,7 @@ setup( 'pca9685': ['adafruit-python-shell', 'adafruit-circuitpython-pca9685'], # Support for ngrok integration 'ngrok': ['pyngrok'], + # Support for IRC integration + 'irc': ['irc'], }, )