forked from platypush/platypush
242 lines
8.5 KiB
Python
242 lines
8.5 KiB
Python
import inspect
|
|
import os
|
|
import subprocess
|
|
|
|
from abc import ABC
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from email.message import Message
|
|
from email.mime.application import MIMEApplication
|
|
from email.mime.audio import MIMEAudio
|
|
from email.mime.base import MIMEBase
|
|
from email.mime.image import MIMEImage
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from mimetypes import guess_type
|
|
from typing import Optional, List, Union, Any, Dict
|
|
|
|
from platypush.message import JSONAble
|
|
from platypush.plugins import Plugin, action
|
|
|
|
|
|
@dataclass
|
|
class ServerInfo:
|
|
server: str
|
|
port: int
|
|
username: Optional[str]
|
|
password: Optional[str]
|
|
ssl: bool
|
|
keyfile: Optional[str]
|
|
certfile: Optional[str]
|
|
access_token: Optional[str]
|
|
oauth_mechanism: Optional[str]
|
|
oauth_vendor: Optional[str]
|
|
timeout: Optional[int]
|
|
|
|
|
|
class Mail(JSONAble):
|
|
def __init__(self, id: int, date: datetime, size: int,
|
|
from_: Optional[Union[Dict[str, str], List[str]]] = None,
|
|
to: Optional[Union[Dict[str, str], List[str]]] = None,
|
|
cc: Optional[Union[Dict[str, str], List[str]]] = None,
|
|
bcc: Optional[Union[Dict[str, str], List[str]]] = None, subject: str = '',
|
|
payload: Optional[Any] = None, **kwargs):
|
|
self.id = id
|
|
self.date = date
|
|
self.size = size
|
|
self.from_ = from_ or kwargs.get('from')
|
|
self.to = to
|
|
self.cc = cc or []
|
|
self.bcc = bcc or []
|
|
self.subject = subject
|
|
self.payload = payload
|
|
|
|
for k, v in kwargs.items():
|
|
setattr(self, k, v)
|
|
|
|
def to_json(self) -> dict:
|
|
return {
|
|
k if k != 'from_' else 'from': v
|
|
for k, v in dict(inspect.getmembers(self)).items()
|
|
if not k.startswith('_') and not callable(v)
|
|
}
|
|
|
|
|
|
class MailPlugin(Plugin, ABC):
|
|
"""
|
|
Base class for mail plugins.
|
|
"""
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.server_info: Optional[ServerInfo] = None
|
|
|
|
@staticmethod
|
|
def _get_password(password: Optional[str] = None, password_cmd: Optional[str] = None) -> Optional[str]:
|
|
"""
|
|
Get the password either from a provided string or from a password command.
|
|
"""
|
|
if not password_cmd:
|
|
return password
|
|
|
|
proc = subprocess.Popen(['sh', '-c', password_cmd], stdout=subprocess.PIPE)
|
|
password = proc.communicate()[0].decode()
|
|
return password or None
|
|
|
|
@staticmethod
|
|
def _get_path(path: str) -> str:
|
|
return os.path.abspath(os.path.expanduser(path))
|
|
|
|
def _get_server_info(self, server: Optional[str] = None, port: Optional[int] = None, username: Optional[str] = None,
|
|
password: Optional[str] = None, password_cmd: Optional[str] = None,
|
|
ssl: Optional[bool] = None, keyfile: Optional[str] = None, certfile: Optional[str] = None,
|
|
access_token: Optional[str] = None, oauth_mechanism: Optional[str] = None,
|
|
oauth_vendor: Optional[str] = None, default_port: Optional[int] = None,
|
|
default_ssl_port: Optional[int] = None, timeout: Optional[int] = None, **kwargs) \
|
|
-> ServerInfo:
|
|
if not port:
|
|
port = default_ssl_port if ssl else default_port
|
|
|
|
info = ServerInfo(server=server, port=port, username=username,
|
|
password=self._get_password(password, password_cmd), ssl=ssl, keyfile=keyfile,
|
|
certfile=certfile, access_token=access_token, oauth_mechanism=oauth_mechanism,
|
|
oauth_vendor=oauth_vendor, timeout=timeout)
|
|
|
|
if server:
|
|
return info
|
|
|
|
if self.server_info:
|
|
assert self.server_info.server, 'No server specified'
|
|
return self.server_info
|
|
|
|
return info
|
|
|
|
|
|
class MailInPlugin(MailPlugin, ABC):
|
|
"""
|
|
Base class for mail in plugins.
|
|
"""
|
|
|
|
@action
|
|
def get_folders(self) -> list:
|
|
raise NotImplementedError()
|
|
|
|
@action
|
|
def get_sub_folders(self) -> list:
|
|
raise NotImplementedError()
|
|
|
|
@action
|
|
def search(self, criteria: str, directory: Optional[str] = None) -> list:
|
|
raise NotImplementedError()
|
|
|
|
@action
|
|
def search_unseen_messages(self, directory: Optional[str] = None) -> list:
|
|
raise NotImplementedError()
|
|
|
|
def search_flagged_messages(self, folder: str = 'INBOX', **connect_args) -> list:
|
|
raise NotImplementedError()
|
|
|
|
@action
|
|
def get_message(self, id) -> dict:
|
|
raise NotImplementedError()
|
|
|
|
|
|
class MailOutPlugin(MailPlugin, ABC):
|
|
"""
|
|
Base class for mail out plugins.
|
|
"""
|
|
|
|
def send_message(self, message: Message, **connect_args):
|
|
raise NotImplementedError()
|
|
|
|
@staticmethod
|
|
def _file_to_part(file: str) -> MIMEBase:
|
|
_type, _subtype, _type_class = 'application', 'octet-stream', MIMEApplication
|
|
mime_type, _sub_subtype = guess_type(file)
|
|
|
|
if mime_type:
|
|
_type, _subtype = mime_type.split('/')
|
|
if _sub_subtype:
|
|
_subtype += ';' + _sub_subtype
|
|
|
|
if _type == 'application':
|
|
_type_class = MIMEApplication
|
|
elif _type == 'audio':
|
|
_type_class = MIMEAudio
|
|
elif _type == 'image':
|
|
_type_class = MIMEImage
|
|
elif _type == 'text':
|
|
_type_class = MIMEText
|
|
|
|
args = {}
|
|
if _type_class != MIMEText:
|
|
mode = 'rb'
|
|
args['Name'] = os.path.basename(file)
|
|
else:
|
|
mode = 'r'
|
|
|
|
with open(file, mode) as f:
|
|
return _type_class(f.read(), _subtype, **args)
|
|
|
|
@classmethod
|
|
def create_message(cls, to: Union[str, List[str]], from_: Optional[str] = None,
|
|
cc: Optional[Union[str, List[str]]] = None, bcc: Optional[Union[str, List[str]]] = None,
|
|
subject: str = '', body: str = '', body_type: str = 'plain',
|
|
attachments: Optional[List[str]] = None, headers: Optional[Dict[str, str]] = None) -> Message:
|
|
assert from_, 'from/from_ field not specified'
|
|
|
|
body = MIMEText(body, body_type)
|
|
if attachments:
|
|
msg = MIMEMultipart()
|
|
msg.attach(body)
|
|
|
|
for attachment in attachments:
|
|
attachment = os.path.abspath(os.path.expanduser(attachment))
|
|
assert os.path.isfile(attachment), 'No such file: {}'.format(attachment)
|
|
part = cls._file_to_part(attachment)
|
|
part['Content-Disposition'] = 'attachment; filename="{}"'.format(os.path.basename(attachment))
|
|
msg.attach(part)
|
|
else:
|
|
msg = body
|
|
|
|
msg['From'] = from_
|
|
msg['To'] = ', '.join(to) if isinstance(to, List) else to
|
|
msg['Cc'] = ', '.join(cc) if cc else ''
|
|
msg['Bcc'] = ', '.join(bcc) if bcc else ''
|
|
msg['Subject'] = subject
|
|
|
|
if headers:
|
|
for name, value in headers.items():
|
|
msg.add_header(name, value)
|
|
|
|
return msg
|
|
|
|
@action
|
|
def send(self, to: Union[str, List[str]], from_: Optional[str] = None,
|
|
cc: Optional[Union[str, List[str]]] = None, bcc: Optional[Union[str, List[str]]] = None,
|
|
subject: str = '', body: str = '', body_type: str = 'plain', attachments: Optional[List[str]] = None,
|
|
headers: Optional[Dict[str, str]] = None, **connect_args):
|
|
"""
|
|
Send an email through the specified SMTP sender.
|
|
|
|
:param to: Receiver(s), as comma-separated strings or list.
|
|
:param from_: Sender email address (``from`` is also supported outside of Python contexts).
|
|
:param cc: Carbon-copy addresses, as comma-separated strings or list
|
|
:param bcc: Blind carbon-copy addresses, as comma-separated strings or list
|
|
:param subject: Mail subject.
|
|
:param body: Mail body.
|
|
:param body_type: Mail body type, as a subtype of ``text/`` (e.g. ``html``) (default: ``plain``).
|
|
:param attachments: List of attachment files to send.
|
|
:param headers: Key-value map of headers to be added.
|
|
:param connect_args: Parameters for ``.connect()``, if you want to override the default server configuration.
|
|
"""
|
|
if not from_ and 'from' in connect_args:
|
|
from_ = connect_args.pop('from')
|
|
|
|
msg = self.create_message(to=to, from_=from_, cc=cc, bcc=bcc, subject=subject, body=body, body_type=body_type,
|
|
attachments=attachments, headers=headers)
|
|
|
|
return self.send_message(msg, **connect_args)
|
|
|
|
|
|
# vim:sw=4:ts=4:et:
|