import email from typing import Optional, List, Dict, Union, Any, Tuple from imapclient import IMAPClient from imapclient.response_types import Address from platypush.plugins import action from platypush.plugins.mail import MailInPlugin, ServerInfo, Mail class MailImapPlugin(MailInPlugin): """ Plugin to interact with a mail server over IMAP. Requires: * **imapclient** (``pip install imapclient``) """ _default_port = 143 _default_ssl_port = 993 def __init__(self, server: str, port: Optional[int] = None, username: Optional[str] = None, password: Optional[str] = None, password_cmd: Optional[str] = None, access_token: Optional[str] = None, oauth_mechanism: Optional[str] = 'XOAUTH2', oauth_vendor: Optional[str] = None, ssl: bool = False, keyfile: Optional[str] = None, certfile: Optional[str] = None, timeout: Optional[int] = 60, **kwargs): """ :param server: Server name/address. :param port: Port (default: 143 for plain, 993 for SSL). :param username: IMAP username. :param password: IMAP password. :param password_cmd: If you don't want to input your password in the configuration, run this command to fetch or decrypt the password. :param access_token: OAuth2 access token if the server supports OAuth authentication. :param oauth_mechanism: OAuth2 mechanism (default: ``XOAUTH2``). :param oauth_vendor: OAuth2 vendor (default: None). :param ssl: Use SSL (default: False). :param keyfile: Private key file for SSL connection if client authentication is required. :param certfile: SSL certificate file or chain. :param timeout: Server connect/read timeout in seconds (default: 60). """ super().__init__(**kwargs) self.server_info = self._get_server_info(server=server, port=port, username=username, password=password, password_cmd=password_cmd, ssl=ssl, keyfile=keyfile, certfile=certfile, access_token=access_token, oauth_mechanism=oauth_mechanism, oauth_vendor=oauth_vendor, timeout=timeout) def _get_server_info(self, server: Optional[str] = None, port: Optional[int] = None, username: Optional[str] = None, password: Optional[str] = None, password_cmd: Optional[str] = None, access_token: Optional[str] = None, oauth_mechanism: Optional[str] = None, oauth_vendor: Optional[str] = None, ssl: Optional[bool] = None, keyfile: Optional[str] = None, certfile: Optional[str] = None, timeout: Optional[int] = None, **kwargs) -> ServerInfo: return super()._get_server_info(server=server, port=port, username=username, password=password, password_cmd=password_cmd, ssl=ssl, keyfile=keyfile, certfile=certfile, default_port=self._default_port, default_ssl_port=self._default_ssl_port, access_token=access_token, oauth_mechanism=oauth_mechanism, oauth_vendor=oauth_vendor, timeout=timeout) def connect(self, **connect_args) -> IMAPClient: info = self._get_server_info(**connect_args) self.logger.info('Connecting to {}'.format(info.server)) context = None if info.ssl: import ssl context = ssl.create_default_context() context.load_cert_chain(certfile=info.certfile, keyfile=info.keyfile) client = IMAPClient(host=info.server, port=info.port, ssl=info.ssl, ssl_context=context) if info.password: client.login(info.username, info.password) elif info.access_token: client.oauth2_login(info.username, access_token=info.access_token, mech=info.oauth_mechanism, vendor=info.oauth_vendor) return client @staticmethod def _get_folders(data: List[tuple]) -> List[Dict[str, str]]: folders = [] for line in data: (flags), delimiter, mailbox_name = line folders.append({ 'name': mailbox_name, 'flags': [flag.decode() for flag in flags], 'delimiter': delimiter.decode(), }) return folders @action def get_folders(self, folder: str = '', pattern: str = '*', **connect_args) -> List[Dict[str, str]]: """ Get the list of all the folders hosted on the server or those matching a pattern. :param folder: Base folder (default: root). :param pattern: Pattern to search (default: None). :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. :return: Example: .. code-block:: json [ { "name": "INBOX", "flags": "\\Noinferiors", "delimiter": "/" }, { "name": "Archive", "flags": "\\Noinferiors", "delimiter": "/" }, { "name": "Spam", "flags": "\\Noinferiors", "delimiter": "/" } ] """ with self.connect(**connect_args) as client: data = client.list_folders(directory=folder, pattern=pattern) return self._get_folders(data) @action def get_sub_folders(self, folder: str = '', pattern: str = '*', **connect_args) -> List[Dict[str, str]]: """ Get the list of all the sub-folders hosted on the server or those matching a pattern. :param folder: Base folder (default: root). :param pattern: Pattern to search (default: None). :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. :return: Example: .. code-block:: json [ { "name": "INBOX", "flags": "\\Noinferiors", "delimiter": "/" }, { "name": "Archive", "flags": "\\Noinferiors", "delimiter": "/" }, { "name": "Spam", "flags": "\\Noinferiors", "delimiter": "/" } ] """ with self.connect(**connect_args) as client: data = client.list_sub_folders(directory=folder, pattern=pattern) return self._get_folders(data) @staticmethod def _parse_address(imap_addr: Address) -> Dict[str, str]: return { 'name': imap_addr.name.decode() if imap_addr.name else None, 'route': imap_addr.route.decode() if imap_addr.route else None, 'email': '{name}@{host}'.format(name=imap_addr.mailbox.decode(), host=imap_addr.host.decode()) } @classmethod def _parse_addresses(cls, addresses: Optional[Tuple[Address]] = None) -> Dict[str, Dict[str, str]]: ret = {} if addresses: for addr in addresses: addr = cls._parse_address(addr) ret[addr['email']] = addr return ret @classmethod def _parse_message(cls, msg_id: int, imap_msg: Dict[bytes, Any]) -> Mail: message = { 'id': msg_id, 'seq': imap_msg[b'SEQ'], } if imap_msg.get(b'FLAGS'): message['flags'] = [flag.decode() for flag in imap_msg[b'FLAGS'] if flag] if b'INTERNALDATE' in imap_msg: message['internal_date'] = imap_msg[b'INTERNALDATE'] if b'RFC822.SIZE' in imap_msg: message['size'] = imap_msg[b'RFC822.SIZE'] if b'ENVELOPE' in imap_msg: envelope = imap_msg[b'ENVELOPE'] message['bcc'] = cls._parse_addresses(envelope.bcc) message['cc'] = cls._parse_addresses(envelope.cc) message['date'] = envelope.date message['from'] = cls._parse_addresses(envelope.from_) message['message_id'] = envelope.message_id.decode() if envelope.message_id else None message['in_reply_to'] = envelope.in_reply_to.decode() if envelope.in_reply_to else None message['reply_to'] = cls._parse_addresses(envelope.reply_to) message['sender'] = cls._parse_addresses(envelope.sender) message['subject'] = envelope.subject.decode() if envelope.subject else None message['to'] = cls._parse_addresses(envelope.to) return Mail(**message) @action def search(self, criteria: Union[str, List[str]] = 'ALL', folder: str = 'INBOX', attributes: Optional[List[str]] = None, **connect_args) -> List[Mail]: """ Search for messages on the server that fit the specified criteria. :param criteria: It should be a sequence of one or more criteria items. Each criteria item may be either unicode or bytes (default: ``ALL``). Example values:: ['UNSEEN'] ['SMALLER', 500] ['NOT', 'DELETED'] ['TEXT', 'foo bar', 'FLAGGED', 'SUBJECT', 'baz'] ['SINCE', '2020-03-14T12:13:45+00:00'] It is also possible (but not recommended) to pass the combined criteria as a single string. In this case IMAPClient won't perform quoting, allowing lower-level specification of criteria. Examples of this style:: 'UNSEEN' 'SMALLER 500' 'NOT DELETED' 'TEXT "foo bar" FLAGGED SUBJECT "baz"' 'SINCE 03-Apr-2005' To support complex search expressions, criteria lists can be nested. The following will match messages that are both not flagged and do not have "foo" in the subject:: ['NOT', ['SUBJECT', 'foo', 'FLAGGED']] :param folder: Folder to search (default: ``INBOX``). :param attributes: Attributes that should be retrieved, according to `RFC 3501 `_ (default: ``ALL`` = ``[FLAGS INTERNALDATE RFC822.SIZE ENVELOPE]``). Note that ``BODY`` will be ignored if specified here for performance reasons - use :meth:`.get_message` if you want to get the full content of a message known its ID from :meth:`.search`. :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. :return: List of messages matching the criteria. Example: .. code-block:: json [ { "id": 702, "seq": 671, "flags": [ "nonjunk" ], "internal_date": "2020-08-30T00:31:52+00:00", "size": 2908738, "bcc": {}, "cc": {}, "date": "2020-08-30T00:31:52+00:00", "from": { "test123@gmail.com": { "name": "A test", "route": null, "email": "test123@gmail.com" } }, "message_id": "", "in_reply_to": "", "reply_to": {}, "sender": { "test123@gmail.com": { "name": "A test", "route": null, "email": "test123@gmail.com" } }, "subject": "Test email", "to": { "me@gmail.com": { "name": null, "route": null, "email": "me@gmail.com" } } } ] """ if not attributes: attributes = ['ALL'] else: attributes = [attr.upper() for attr in attributes] data = {} with self.connect(**connect_args) as client: client.select_folder(folder, readonly=True) ids = client.search(criteria) if len(ids): data = client.fetch(list(ids), attributes) messages = [ self._parse_message(msg_id, data[msg_id]) for msg_id in sorted(data.keys()) ] return messages @action def search_unseen_messages(self, folder: str = 'INBOX', **connect_args) -> List[Mail]: """ Shortcut for :meth:`.search` that returns only the unread messages. """ return self.search(criteria='UNSEEN', directory=folder, attributes=['ALL'], **connect_args) @action def get_message(self, id: int, folder: str = 'INBOX', **connect_args) -> Mail: """ Get the full content of a message given the ID returned by :meth:`.search`. :param id: Message ID. :param folder: Folder name (default: ``INBOX``). :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. :return: A message in the same format as :meth:`.search`, with an added ``payload`` attribute containing the body/payload. """ with self.connect(**connect_args) as client: client.select_folder(folder, readonly=True) data = client.fetch(id, ['ALL', 'RFC822']) assert id in data, 'No such message ID: {}'.format(id) data = data[id] ret = self._parse_message(id, data) msg = email.message_from_bytes(data[b'RFC822']) ret.payload = msg.get_payload() return ret # vim:sw=4:ts=4:et: