Added events on IMAP flagged/starred messages and extended IMAP plugin with remaining methods [closes ]

This commit is contained in:
Fabio Manganiello 2020-09-02 00:07:08 +02:00
parent 6c24783df7
commit ca168828de
4 changed files with 301 additions and 20 deletions
platypush
backend
message/event
plugins/mail

View file

@ -16,7 +16,7 @@ from sqlalchemy.ext.declarative import declarative_base
from platypush.backend import Backend
from platypush.config import Config
from platypush.context import get_plugin
from platypush.message.event.mail import MailReceivedEvent, MailSeenEvent
from platypush.message.event.mail import MailReceivedEvent, MailSeenEvent, MailFlaggedEvent, MailUnflaggedEvent
from platypush.plugins.mail import MailInPlugin, Mail
# <editor-fold desc="Database tables">
@ -30,6 +30,7 @@ class MailboxStatus(Base):
mailbox_id = Column(Integer, primary_key=True)
unseen_message_ids = Column(String, default='[]')
flagged_message_ids = Column(String, default='[]')
last_checked_date = Column(DateTime)
@ -58,10 +59,13 @@ class MailBackend(Backend):
- :class:`platypush.message.event.mail.MailReceivedEvent` when a new message is received.
- :class:`platypush.message.event.mail.MailSeenEvent` when a message is marked as seen.
- :class:`platypush.message.event.mail.MailFlaggedEvent` when a message is marked as flagged/starred.
- :class:`platypush.message.event.mail.MailUnflaggedEvent` when a message is marked as unflagged/unstarred.
"""
def __init__(self, mailboxes: List[Dict[str, Any]], timeout: Optional[int] = 60, **kwargs):
def __init__(self, mailboxes: List[Dict[str, Any]], timeout: Optional[int] = 60, poll_seconds: Optional[int] = 60,
**kwargs):
"""
:param mailboxes: List of mailboxes to be monitored. Each mailbox entry contains a ``plugin`` attribute to
identify the :class:`platypush.plugins.mail.MailInPlugin` plugin that will be used (e.g. ``mail.imap``)
@ -107,14 +111,17 @@ class MailBackend(Backend):
name: "My Local Server"
folder: "All Mail"
:param poll_seconds: How often the backend should check the mail (default: 60).
:param timeout: Connect/read timeout for a mailbox, in seconds (default: 60).
"""
self.logger.info('Initializing mail backend')
super().__init__(**kwargs)
self.poll_seconds = poll_seconds
self.mailboxes: List[Mailbox] = []
self.timeout = timeout
self._unread_msgs: List[Dict[int, Mail]] = [{}] * len(mailboxes)
self._flagged_msgs: List[Dict[int, Mail]] = [{}] * len(mailboxes)
self._db_lock = RLock()
self.workdir = os.path.join(os.path.expanduser(Config.get('workdir')), 'mail')
self.dbfile = os.path.join(self.workdir, 'backend.db')
@ -132,14 +139,14 @@ class MailBackend(Backend):
self._db = self._db_get_engine()
Base.metadata.create_all(self._db)
Session.configure(bind=self._db)
self._db_sync_mailboxes()
self._db_load_mailboxes_status()
self.logger.info('Mail backend initialized')
# <editor-fold desc="Database methods">
def _db_get_engine(self) -> engine.Engine:
return create_engine('sqlite:///{}'.format(self.dbfile), connect_args={'check_same_thread': False})
def _db_sync_mailboxes(self) -> None:
def _db_load_mailboxes_status(self) -> None:
mailbox_ids = list(range(len(self.mailboxes)))
with self._db_lock:
@ -151,13 +158,15 @@ class MailBackend(Backend):
for mbox_id, mbox in enumerate(self.mailboxes):
if mbox_id not in records:
record = MailboxStatus(mailbox_id=mbox_id, unseen_message_ids='[]')
record = MailboxStatus(mailbox_id=mbox_id, unseen_message_ids='[]', flagged_message_ids='[]')
session.add(record)
else:
record = records[mbox_id]
unseen_msg_ids = json.loads(record.unseen_message_ids or '[]')
flagged_msg_ids = json.loads(record.flagged_message_ids or '[]')
self._unread_msgs[mbox_id] = {msg_id: {} for msg_id in unseen_msg_ids}
self._flagged_msgs[mbox_id] = {msg_id: {} for msg_id in flagged_msg_ids}
session.commit()
@ -173,11 +182,15 @@ class MailBackend(Backend):
# <editor-fold desc="Parse unread messages logic">
@staticmethod
def _check_thread(q: Queue, plugin: MailInPlugin, **args):
def _check_thread(unread_queue: Queue, flagged_queue: Queue, plugin: MailInPlugin, **args):
def thread():
# noinspection PyUnresolvedReferences
unread = plugin.search_unseen_messages(**args).output
q.put({msg.id: msg for msg in unread})
unread_queue.put({msg.id: msg for msg in unread})
# noinspection PyUnresolvedReferences
flagged = plugin.search_flagged_messages(**args).output
flagged_queue.put({msg.id: msg for msg in flagged})
return thread
@ -195,34 +208,57 @@ class MailBackend(Backend):
if msg_id not in unread_msgs
}
def _get_flagged_unflagged_msgs(self, mailbox_idx: int, flagged_msgs: Dict[int, Mail]) \
-> Tuple[Dict[int, Mail], Dict[int, Mail]]:
prev_flagged_msgs = self._flagged_msgs[mailbox_idx]
return {
msg_id: flagged_msgs[msg_id]
for msg_id in flagged_msgs
if msg_id not in prev_flagged_msgs
}, {
msg_id: prev_flagged_msgs[msg_id]
for msg_id in prev_flagged_msgs
if msg_id not in flagged_msgs
}
def _process_msg_events(self, mailbox_id: int, unread: List[Mail], seen: List[Mail],
last_checked_date: Optional[datetime] = None):
flagged: List[Mail], unflagged: List[Mail], last_checked_date: Optional[datetime] = None):
for msg in unread:
if msg.date and last_checked_date and msg.date < last_checked_date:
continue
self.bus.post(MailReceivedEvent(mailbox=self.mailboxes[mailbox_id].name, msg=msg))
for msg in seen:
self.bus.post(MailSeenEvent(mailbox=self.mailboxes[mailbox_id].name, msg=msg))
def _check_mailboxes(self) -> List[Dict[int, Mail]]:
for msg in flagged:
self.bus.post(MailFlaggedEvent(mailbox=self.mailboxes[mailbox_id].name, msg=msg))
for msg in unflagged:
self.bus.post(MailUnflaggedEvent(mailbox=self.mailboxes[mailbox_id].name, msg=msg))
def _check_mailboxes(self) -> List[Tuple[Dict[int, Mail], Dict[int, Mail]]]:
workers = []
queues = []
queues: List[Tuple[Queue, Queue]] = []
results = []
for mbox in self.mailboxes:
q = Queue()
worker = Thread(target=self._check_thread(q, plugin=mbox.plugin, **mbox.args))
unread_queue, flagged_queue = [Queue()] * 2
worker = Thread(target=self._check_thread(unread_queue=unread_queue, flagged_queue=flagged_queue,
plugin=mbox.plugin, **mbox.args))
worker.start()
workers.append(worker)
queues.append(q)
queues.append((unread_queue, flagged_queue))
for worker in workers:
worker.join(timeout=self.timeout)
for i, q in enumerate(queues):
for i, (unread_queue, flagged_queue) in enumerate(queues):
try:
unread = q.get(timeout=self.timeout)
results.append(unread)
unread = unread_queue.get(timeout=self.timeout)
flagged = flagged_queue.get(timeout=self.timeout)
results.append((unread, flagged))
except Empty:
self.logger.warning('Checks on mailbox #{} timed out after {} seconds'.format(i + 1, self.timeout))
continue
@ -237,14 +273,18 @@ class MailBackend(Backend):
mailbox_statuses = self._db_get_mailbox_status(list(range(len(self.mailboxes))))
results = self._check_mailboxes()
for i, unread in enumerate(results):
for i, (unread, flagged) in enumerate(results):
unread_msgs, seen_msgs = self._get_unread_seen_msgs(i, unread)
flagged_msgs, unflagged_msgs = self._get_flagged_unflagged_msgs(i, flagged)
self._process_msg_events(i, unread=list(unread_msgs.values()), seen=list(seen_msgs.values()),
flagged=list(flagged_msgs.values()), unflagged=list(unflagged_msgs.values()),
last_checked_date=mailbox_statuses[i].last_checked_date)
self._unread_msgs[i] = unread
self._flagged_msgs[i] = flagged
records.append(MailboxStatus(mailbox_id=i,
unseen_message_ids=json.dumps([msg_id for msg_id in unread.keys()]),
flagged_message_ids=json.dumps([msg_id for msg_id in flagged.keys()]),
last_checked_date=datetime.now()))
with self._db_lock:

View file

@ -22,4 +22,18 @@ class MailSeenEvent(MailEvent):
pass
class MailFlaggedEvent(MailEvent):
"""
Triggered when a message is marked as flagged/starred.
"""
pass
class MailUnflaggedEvent(MailEvent):
"""
Triggered when a message previously marked as flagged/starred is unflagged.
"""
pass
# vim:sw=4:ts=4:et:

View file

@ -133,6 +133,9 @@ class MailInPlugin(MailPlugin, ABC):
def search_unseen_messages(self, directory: Optional[str] = None) -> list:
raise NotImplementedError()
def search_flagged_messages(self, folder: str = 'INBOX', **connect_args) -> list:
raise NotImplementedError()
@action
def get_message(self, id) -> dict:
raise NotImplementedError()

View file

@ -301,13 +301,11 @@ class MailImapPlugin(MailInPlugin):
if len(ids):
data = client.fetch(list(ids), attributes)
messages = [
return [
self._parse_message(msg_id, data[msg_id])
for msg_id in sorted(data.keys())
]
return messages
@action
def search_unseen_messages(self, folder: str = 'INBOX', **connect_args) -> List[Mail]:
"""
@ -315,6 +313,48 @@ class MailImapPlugin(MailInPlugin):
"""
return self.search(criteria='UNSEEN', directory=folder, attributes=['ALL'], **connect_args)
@action
def search_flagged_messages(self, folder: str = 'INBOX', **connect_args) -> List[Mail]:
"""
Shortcut for :meth:`.search` that returns only the flagged/starred messages.
"""
return self.search(criteria='Flagged', directory=folder, attributes=['ALL'], **connect_args)
@action
def search_starred_messages(self, folder: str = 'INBOX', **connect_args) -> List[Mail]:
"""
Shortcut for :meth:`.search` that returns only the flagged/starred messages.
"""
return self.search_flagged_messages(folder, **connect_args)
@action
def sort(self, folder: str = 'INBOX', sort_criteria: Union[str, List[str]] = 'ARRIVAL',
criteria: Union[str, List[str]] = 'ALL', **connect_args) -> List[int]:
"""
Return a list of message ids from the currently selected folder, sorted by ``sort_criteria`` and optionally
filtered by ``criteria``. Note that SORT is an extension to the IMAP4 standard so it may not be supported by
all IMAP servers.
:param folder: Folder to be searched (default: ``INBOX``).
:param sort_criteria: It may be a sequence of strings or a single string. IMAPClient will take care any required
conversions. Valid *sort_criteria* values::
.. code-block:: python
['ARRIVAL']
['SUBJECT', 'ARRIVAL']
'ARRIVAL'
'REVERSE SIZE'
:param criteria: Optional filter for the messages, as specified in :meth:`.search`.
:param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override.
:return: A list of message IDs that fit the criteria.
"""
with self.connect(**connect_args) as client:
client.select_folder(folder, readonly=True)
msg_ids = client.sort(sort_criteria=sort_criteria, criteria=criteria)
return msg_ids
@action
def get_message(self, id: int, folder: str = 'INBOX', **connect_args) -> Mail:
"""
@ -338,4 +378,188 @@ class MailImapPlugin(MailInPlugin):
return ret
@action
def create_folder(self, folder: str, **connect_args):
"""
Create a folder on the server.
:param folder: Folder name.
:param connect_args:
:param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override.
"""
with self.connect(**connect_args) as client:
client.create_folder(folder)
@action
def rename_folder(self, old_name: str, new_name: str, **connect_args):
"""
Rename a folder on the server.
:param old_name: Previous name
:param new_name: New name
:param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override.
"""
with self.connect(**connect_args) as client:
client.rename_folder(old_name, new_name)
@action
def delete_folder(self, folder: str, **connect_args):
"""
Delete a folder from the server.
:param folder: Folder name.
:param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override.
"""
with self.connect(**connect_args) as client:
client.delete_folder(folder)
@action
def add_flags(self, messages: List[int], flags: List[str], folder: str = 'INBOX', **connect_args):
"""
Add a set of flags to the specified set of message IDs.
:param messages: List of message IDs.
:param flags: List of flags to be added. Examples:
.. code-block:: python
['Flagged']
['Seen', 'Deleted']
['Junk']
:param folder: IMAP folder (default: ``INBOX``).
:param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override.
"""
with self.connect(**connect_args) as client:
client.select_folder(folder)
client.add_flags(messages, flags)
@action
def set_flags(self, messages: List[int], flags: List[str], folder: str = 'INBOX', **connect_args):
"""
Set a set of flags to the specified set of message IDs.
:param messages: List of message IDs.
:param flags: List of flags to be added. Examples:
.. code-block:: python
['Flagged']
['Seen', 'Deleted']
['Junk']
:param folder: IMAP folder (default: ``INBOX``).
:param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override.
"""
with self.connect(**connect_args) as client:
client.select_folder(folder)
client.set_flags(messages, flags)
@action
def remove_flags(self, messages: List[int], flags: List[str], folder: str = 'INBOX', **connect_args):
"""
Remove a set of flags to the specified set of message IDs.
:param messages: List of message IDs.
:param flags: List of flags to be added. Examples:
.. code-block:: python
['Flagged']
['Seen', 'Deleted']
['Junk']
:param folder: IMAP folder (default: ``INBOX``).
:param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override.
"""
with self.connect(**connect_args) as client:
client.select_folder(folder)
client.remove_flags(messages, flags)
@action
def flag_messages(self, messages: List[int], folder: str = 'INBOX', **connect_args):
"""
Add a flag/star to the specified set of message IDs.
:param messages: List of message IDs.
:param folder: IMAP folder (default: ``INBOX``).
:param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override.
"""
return self.add_flags(messages, ['Flagged'], folder=folder, **connect_args)
@action
def unflag_messages(self, messages: List[int], folder: str = 'INBOX', **connect_args):
"""
Remove a flag/star from the specified set of message IDs.
:param messages: List of message IDs.
:param folder: IMAP folder (default: ``INBOX``).
:param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override.
"""
return self.remove_flags(messages, ['Flagged'], folder=folder, **connect_args)
@action
def flag_message(self, message: int, folder: str = 'INBOX', **connect_args):
"""
Add a flag/star to the specified set of message ID.
:param message: Message ID.
:param folder: IMAP folder (default: ``INBOX``).
:param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override.
"""
return self.flag_messages([message], folder=folder, **connect_args)
@action
def unflag_message(self, message: int, folder: str = 'INBOX', **connect_args):
"""
Remove a flag/star from the specified set of message ID.
:param message: Message ID.
:param folder: IMAP folder (default: ``INBOX``).
:param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override.
"""
return self.unflag_messages([message], folder=folder, **connect_args)
@action
def copy_messages(self, messages: List[int], dest_folder: str, source_folder: str = 'INBOX', **connect_args):
"""
Copy a set of messages IDs from a folder to another.
:param messages: List of message IDs.
:param source_folder: Source folder.
:param dest_folder: Destination folder.
:param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override.
"""
with self.connect(**connect_args) as client:
client.select_folder(source_folder)
client.copy(messages, dest_folder)
@action
def move_messages(self, messages: List[int], dest_folder: str, source_folder: str = 'INBOX', **connect_args):
"""
Move a set of messages IDs from a folder to another.
:param messages: List of message IDs.
:param source_folder: Source folder.
:param dest_folder: Destination folder.
:param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override.
"""
with self.connect(**connect_args) as client:
client.select_folder(source_folder)
client.move(messages, dest_folder)
@action
def expunge_messages(self, folder: str = 'INBOX', messages: Optional[List[int]] = None, **connect_args):
"""
When ``messages`` is not set, remove all the messages from ``folder`` marked as ``Deleted``.
:param folder: IMAP folder (default: ``INBOX``).
:param messages: List of message IDs to expunge (default: all those marked as ``Deleted``).
:param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override.
"""
with self.connect(**connect_args) as client:
client.select_folder(folder)
client.expunge(messages)
# vim:sw=4:ts=4:et: