From 8a84a36905c7e146a24d13cd4109a7fac0a77dab Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 19 Jan 2024 03:11:55 +0100 Subject: [PATCH 1/4] [#351] Merged `google.pubsub` plugin and backend. Closes: #351 --- docs/source/backends.rst | 1 - .../platypush/backend/google.pubsub.rst | 5 - platypush/backend/google/__init__.py | 0 platypush/backend/google/pubsub/__init__.py | 88 --------- platypush/backend/google/pubsub/manifest.yaml | 9 - platypush/plugins/google/pubsub/__init__.py | 171 +++++++++++++++--- platypush/plugins/google/pubsub/manifest.yaml | 3 +- platypush/utils/mock/modules.py | 3 + 8 files changed, 150 insertions(+), 130 deletions(-) delete mode 100644 docs/source/platypush/backend/google.pubsub.rst delete mode 100644 platypush/backend/google/__init__.py delete mode 100644 platypush/backend/google/pubsub/__init__.py delete mode 100644 platypush/backend/google/pubsub/manifest.yaml diff --git a/docs/source/backends.rst b/docs/source/backends.rst index 3a152791..8b29fb7f 100644 --- a/docs/source/backends.rst +++ b/docs/source/backends.rst @@ -10,7 +10,6 @@ Backends platypush/backend/button.flic.rst platypush/backend/camera.pi.rst platypush/backend/chat.telegram.rst - platypush/backend/google.pubsub.rst platypush/backend/gps.rst platypush/backend/http.rst platypush/backend/mail.rst diff --git a/docs/source/platypush/backend/google.pubsub.rst b/docs/source/platypush/backend/google.pubsub.rst deleted file mode 100644 index 76bffd3c..00000000 --- a/docs/source/platypush/backend/google.pubsub.rst +++ /dev/null @@ -1,5 +0,0 @@ -``google.pubsub`` -=================================== - -.. automodule:: platypush.backend.google.pubsub - :members: diff --git a/platypush/backend/google/__init__.py b/platypush/backend/google/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/platypush/backend/google/pubsub/__init__.py b/platypush/backend/google/pubsub/__init__.py deleted file mode 100644 index 39a6d3e1..00000000 --- a/platypush/backend/google/pubsub/__init__.py +++ /dev/null @@ -1,88 +0,0 @@ -import json - -from typing import Optional, List - -from platypush.backend import Backend -from platypush.context import get_plugin -from platypush.message.event.google.pubsub import GooglePubsubMessageEvent - - -class GooglePubsubBackend(Backend): - """ - Subscribe to a list of topics on a Google Pub/Sub instance. See - :class:`platypush.plugins.google.pubsub.GooglePubsubPlugin` for a reference on how to generate your - project and credentials file. - """ - - def __init__( - self, topics: List[str], credentials_file: Optional[str] = None, *args, **kwargs - ): - """ - :param topics: List of topics to subscribe. You can either specify the full topic name in the format - ``projects//topics/``, where ```` must be the ID of your - Google Pub/Sub project, or just ```` - in such case it's implied that you refer to the - ``topic_name`` under the ``project_id`` of your service credentials. - :param credentials_file: Path to the Pub/Sub service credentials file (default: value configured on the - ``google.pubsub`` plugin or ``~/.credentials/platypush/google/pubsub.json``). - """ - - super().__init__(*args, name='GooglePubSub', **kwargs) - self.topics = topics - - if credentials_file: - self.credentials_file = credentials_file - else: - plugin = self._get_plugin() - self.credentials_file = plugin.credentials_file - - @staticmethod - def _get_plugin(): - plugin = get_plugin('google.pubsub') - assert plugin, 'google.pubsub plugin not enabled' - return plugin - - def _message_callback(self, topic): - def callback(msg): - data = msg.data.decode() - try: - data = json.loads(data) - except Exception as e: - self.logger.debug('Not a valid JSON: %s: %s', data, e) - - msg.ack() - self.bus.post(GooglePubsubMessageEvent(topic=topic, msg=data)) - - return callback - - def run(self): - # noinspection PyPackageRequirements - from google.cloud import pubsub_v1 - - # noinspection PyPackageRequirements - from google.api_core.exceptions import AlreadyExists - - super().run() - plugin = self._get_plugin() - project_id = plugin.get_project_id() - credentials = plugin.get_credentials(plugin.subscriber_audience) - subscriber = pubsub_v1.SubscriberClient(credentials=credentials) - - for topic in self.topics: - prefix = f'projects/{project_id}/topics/' - if not topic.startswith(prefix): - topic = f'{prefix}{topic}' - subscription_name = '/'.join( - [*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]] - ) - - try: - subscriber.create_subscription(name=subscription_name, topic=topic) - except AlreadyExists: - pass - - subscriber.subscribe(subscription_name, self._message_callback(topic)) - - self.wait_stop() - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/google/pubsub/manifest.yaml b/platypush/backend/google/pubsub/manifest.yaml deleted file mode 100644 index c7fd5c41..00000000 --- a/platypush/backend/google/pubsub/manifest.yaml +++ /dev/null @@ -1,9 +0,0 @@ -manifest: - events: - platypush.message.event.google.pubsub.GooglePubsubMessageEvent: when a new message - is received ona subscribed topic. - install: - pip: - - google-cloud-pubsub - package: platypush.backend.google.pubsub - type: backend diff --git a/platypush/plugins/google/pubsub/__init__.py b/platypush/plugins/google/pubsub/__init__.py index 1834844d..7b66b6be 100644 --- a/platypush/plugins/google/pubsub/__init__.py +++ b/platypush/plugins/google/pubsub/__init__.py @@ -1,59 +1,87 @@ import json import os +from threading import RLock +from typing import Iterable, Optional -from platypush.plugins import Plugin, action +from google.auth import jwt +from google.cloud import pubsub_v1 as pubsub # pylint: disable=no-name-in-module +from google.api_core.exceptions import AlreadyExists, NotFound + +from platypush.config import Config +from platypush.message.event.google.pubsub import GooglePubsubMessageEvent +from platypush.plugins import RunnablePlugin, action -class GooglePubsubPlugin(Plugin): +class GooglePubsubPlugin(RunnablePlugin): """ - Send messages over a Google pub/sub instance. - You'll need a Google Cloud active project and a set of credentials to use this plugin: + Publishes and subscribes to Google Pub/Sub topics. + + You'll need a Google Cloud active project and a set of credentials to use + this plugin: 1. Create a project on the `Google Cloud console - `_ if you don't have - one already. + `_ if you don't have + one already. 2. In the `Google Cloud API console - `_ - create a new service account key. Select "New Service Account", choose - the role "Pub/Sub Editor" and leave the key type as JSON. + `_ + create a new service account key. Select "New Service Account", choose + the role "Pub/Sub Editor" and leave the key type as JSON. 3. Download the JSON service credentials file. By default Platypush - will look for the credentials file under - ``~/.credentials/platypush/google/pubsub.json``. + will look for the credentials file under + ``/credentials/google/pubsub.json``. """ publisher_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher' subscriber_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber' default_credentials_file = os.path.join( - os.path.expanduser('~'), '.credentials', 'platypush', 'google', 'pubsub.json' + Config.get_workdir(), 'credentials', 'google', 'pubsub.json' ) - def __init__(self, credentials_file: str = default_credentials_file, **kwargs): + def __init__( + self, + credentials_file: str = default_credentials_file, + topics: Iterable[str] = (), + **kwargs, + ): """ :param credentials_file: Path to the JSON credentials file for Google pub/sub (default: ``~/.credentials/platypush/google/pubsub.json``) + :param topics: List of topics to subscribe. You can either specify the + full topic name in the format + ``projects//topics/``, where + ```` must be the ID of your Google Pub/Sub project, or + just ```` - in such case it's implied that you refer to + the ``topic_name`` under the ``project_id`` of your service + credentials. """ super().__init__(**kwargs) self.credentials_file = credentials_file self.project_id = self.get_project_id() + self.topics = topics + self._subscriber: Optional[pubsub.SubscriberClient] = None + self._subscriber_lock = RLock() def get_project_id(self): with open(self.credentials_file) as f: return json.load(f).get('project_id') def get_credentials(self, audience: str): - from google.auth import jwt - return jwt.Credentials.from_service_account_file( self.credentials_file, audience=audience ) + def _norm_topic(self, topic: str): + if not topic.startswith(f'projects/{self.project_id}/topics/'): + topic = f'projects/{self.project_id}/topics/{topic}' + return topic + @action - def send_message(self, topic: str, msg, **kwargs): + def publish(self, topic: str, msg, **kwargs): """ - Sends a message to a topic + Publish a message to a topic :param topic: Topic/channel where the message will be delivered. You can either specify the full topic name in the format @@ -65,19 +93,14 @@ class GooglePubsubPlugin(Plugin): :param msg: Message to be sent. It can be a list, a dict, or a Message object :param kwargs: Extra arguments to be passed to .publish() """ - from google.cloud import pubsub_v1 - from google.api_core.exceptions import AlreadyExists - credentials = self.get_credentials(self.publisher_audience) - publisher = pubsub_v1.PublisherClient(credentials=credentials) - - if not topic.startswith(f'projects/{self.project_id}/topics/'): - topic = f'projects/{self.project_id}/topics/{topic}' + publisher = pubsub.PublisherClient(credentials=credentials) + topic = self._norm_topic(topic) try: - publisher.create_topic(topic) + publisher.create_topic(name=topic) except AlreadyExists: - pass + self.logger.debug('Topic %s already exists', topic) if isinstance(msg, (int, float)): msg = str(msg) @@ -88,5 +111,101 @@ class GooglePubsubPlugin(Plugin): publisher.publish(topic, msg, **kwargs) + @action + def send_message(self, topic: str, msg, **kwargs): + """ + Alias for :meth:`.publish` + """ + self.publish(topic=topic, msg=msg, **kwargs) + + @action + def subscribe(self, topic: str): + """ + Subscribe to a topic. + + :param topic: Topic/channel where the message will be delivered. You + can either specify the full topic name in the format + ``projects//topics/``, where + ```` must be the ID of your Google Pub/Sub project, or + just ```` - in such case it's implied that you refer to + the ``topic_name`` under the ``project_id`` of your service + credentials. + """ + assert self._subscriber, 'Subscriber not initialized' + topic = self._norm_topic(topic) + subscription_name = '/'.join( + [*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]] + ) + + try: + self._subscriber.create_subscription(name=subscription_name, topic=topic) + except AlreadyExists: + self.logger.debug('Subscription %s already exists', subscription_name) + + self._subscriber.subscribe(subscription_name, self._message_callback(topic)) + + @action + def unsubscribe(self, topic: str): + """ + Unsubscribe from a topic. + + :param topic: Topic/channel where the message will be delivered. You + can either specify the full topic name in the format + ``projects//topics/``, where + ```` must be the ID of your Google Pub/Sub project, or + just ```` - in such case it's implied that you refer to + the ``topic_name`` under the ``project_id`` of your service + credentials. + """ + assert self._subscriber, 'Subscriber not initialized' + topic = self._norm_topic(topic) + subscription_name = '/'.join( + [*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]] + ) + + try: + self._subscriber.delete_subscription(subscription=subscription_name) + except NotFound: + self.logger.debug('Subscription %s not found', subscription_name) + + def _message_callback(self, topic): + def callback(msg): + data = msg.data.decode() + try: + data = json.loads(data) + except Exception as e: + self.logger.debug('Not a valid JSON: %s: %s', data, e) + + msg.ack() + self._bus.post(GooglePubsubMessageEvent(topic=topic, msg=data)) + + return callback + + def main(self): + credentials = self.get_credentials(self.subscriber_audience) + with self._subscriber_lock: + self._subscriber = pubsub.SubscriberClient(credentials=credentials) + + for topic in self.topics: + self.subscribe(topic=topic) + + self.wait_stop() + with self._subscriber_lock: + self._close() + + def _close(self): + with self._subscriber_lock: + if self._subscriber: + try: + self._subscriber.close() + except Exception as e: + self.logger.debug('Error while closing the subscriber: %s', e) + + self._subscriber = None + + def stop(self): + self._close() + super().stop() + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/google/pubsub/manifest.yaml b/platypush/plugins/google/pubsub/manifest.yaml index 360fba64..b16e30bc 100644 --- a/platypush/plugins/google/pubsub/manifest.yaml +++ b/platypush/plugins/google/pubsub/manifest.yaml @@ -1,5 +1,6 @@ manifest: - events: {} + events: + - platypush.message.event.google.pubsub.GooglePubsubMessageEvent install: apk: - py3-google-api-python-client diff --git a/platypush/utils/mock/modules.py b/platypush/utils/mock/modules.py index 1a192170..6af74355 100644 --- a/platypush/utils/mock/modules.py +++ b/platypush/utils/mock/modules.py @@ -35,10 +35,13 @@ mock_imports = [ "gi", "gi.repository", "google", + "google.api_core", + "google.auth", "google.assistant.embedded", "google.assistant.library", "google.assistant.library.event", "google.assistant.library.file_helpers", + "google.cloud", "google.oauth2.credentials", "googlesamples", "googlesamples.assistant.grpc.audio_helpers", -- 2.45.1 From d82a5ecb1e75f7ad24bbef6143b7119477c2ae0e Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 19 Jan 2024 21:54:49 +0100 Subject: [PATCH 2/4] [#356] Merged `adafruit.io` plugin and backend. --- docs/source/backends.rst | 1 - docs/source/platypush/backend/adafruit.io.rst | 6 - platypush/backend/adafruit/__init__.py | 0 platypush/backend/adafruit/io/__init__.py | 97 ------ platypush/backend/adafruit/io/manifest.yaml | 12 - platypush/message/event/adafruit.py | 13 +- platypush/plugins/adafruit/io/__init__.py | 327 +++++++++++------- platypush/plugins/adafruit/io/manifest.yaml | 5 +- 8 files changed, 211 insertions(+), 250 deletions(-) delete mode 100644 docs/source/platypush/backend/adafruit.io.rst delete mode 100644 platypush/backend/adafruit/__init__.py delete mode 100644 platypush/backend/adafruit/io/__init__.py delete mode 100644 platypush/backend/adafruit/io/manifest.yaml diff --git a/docs/source/backends.rst b/docs/source/backends.rst index 8b29fb7f..1d1612ab 100644 --- a/docs/source/backends.rst +++ b/docs/source/backends.rst @@ -6,7 +6,6 @@ Backends :maxdepth: 1 :caption: Backends: - platypush/backend/adafruit.io.rst platypush/backend/button.flic.rst platypush/backend/camera.pi.rst platypush/backend/chat.telegram.rst diff --git a/docs/source/platypush/backend/adafruit.io.rst b/docs/source/platypush/backend/adafruit.io.rst deleted file mode 100644 index d4bd4cc4..00000000 --- a/docs/source/platypush/backend/adafruit.io.rst +++ /dev/null @@ -1,6 +0,0 @@ -``adafruit.io`` -================================= - -.. automodule:: platypush.backend.adafruit.io - :members: - diff --git a/platypush/backend/adafruit/__init__.py b/platypush/backend/adafruit/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/platypush/backend/adafruit/io/__init__.py b/platypush/backend/adafruit/io/__init__.py deleted file mode 100644 index a22856ae..00000000 --- a/platypush/backend/adafruit/io/__init__.py +++ /dev/null @@ -1,97 +0,0 @@ -from typing import Optional - -from platypush.backend import Backend -from platypush.context import get_plugin -from platypush.message.event.adafruit import ( - ConnectedEvent, - DisconnectedEvent, - FeedUpdateEvent, -) - - -class AdafruitIoBackend(Backend): - """ - Backend that listens to messages received over the Adafruit IO message queue - - Requires: - - * The :class:`platypush.plugins.adafruit.io.AdafruitIoPlugin` plugin to - be active and configured. - """ - - def __init__(self, feeds, *args, **kwargs): - """ - :param feeds: List of feed IDs to monitor - :type feeds: list[str] - """ - - super().__init__(*args, **kwargs) - from Adafruit_IO import MQTTClient - - self.feeds = feeds - self._client: Optional[MQTTClient] = None - - def _init_client(self): - if self._client: - return - - from Adafruit_IO import MQTTClient - - plugin = get_plugin('adafruit.io') - if not plugin: - raise RuntimeError('Adafruit IO plugin not configured') - - # noinspection PyProtectedMember - self._client = MQTTClient(plugin._username, plugin._key) - self._client.on_connect = self.on_connect() - self._client.on_disconnect = self.on_disconnect() - self._client.on_message = self.on_message() - - def on_connect(self): - def _handler(client): - for feed in self.feeds: - client.subscribe(feed) - self.bus.post(ConnectedEvent()) - - return _handler - - def on_disconnect(self): - def _handler(*_, **__): - self.bus.post(DisconnectedEvent()) - - return _handler - - def on_message(self, *_, **__): - # noinspection PyUnusedLocal - def _handler(client, feed, data): - try: - data = float(data) - except Exception as e: - self.logger.debug('Not a number: {}: {}'.format(data, e)) - - self.bus.post(FeedUpdateEvent(feed=feed, data=data)) - - return _handler - - def run(self): - super().run() - - self.logger.info( - ('Initialized Adafruit IO backend, listening on ' + 'feeds {}').format( - self.feeds - ) - ) - - while not self.should_stop(): - try: - self._init_client() - # noinspection PyUnresolvedReferences - self._client.connect() - # noinspection PyUnresolvedReferences - self._client.loop_blocking() - except Exception as e: - self.logger.exception(e) - self._client = None - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/adafruit/io/manifest.yaml b/platypush/backend/adafruit/io/manifest.yaml deleted file mode 100644 index e019ab9c..00000000 --- a/platypush/backend/adafruit/io/manifest.yaml +++ /dev/null @@ -1,12 +0,0 @@ -manifest: - events: - platypush.message.event.adafruit.ConnectedEvent: when thebackend connects to the - Adafruit queue - platypush.message.event.adafruit.DisconnectedEvent: when thebackend disconnects - from the Adafruit queue - platypush.message.event.adafruit.FeedUpdateEvent: when anupdate event is received - on a monitored feed - install: - pip: [] - package: platypush.backend.adafruit.io - type: backend diff --git a/platypush/message/event/adafruit.py b/platypush/message/event/adafruit.py index fea8e2a0..341c24fe 100644 --- a/platypush/message/event/adafruit.py +++ b/platypush/message/event/adafruit.py @@ -1,27 +1,28 @@ from platypush.message.event import Event -class ConnectedEvent(Event): +class AdafruitConnectedEvent(Event): """ - Event triggered when the backend connects to the Adafruit message queue + Event triggered when the backend connects to the Adafruit message queue. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) -class DisconnectedEvent(Event): +class AdafruitDisconnectedEvent(Event): """ - Event triggered when the backend disconnects from the Adafruit message queue + Event triggered when the backend disconnects from the Adafruit message + queue. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) -class FeedUpdateEvent(Event): +class AdafruitFeedUpdateEvent(Event): """ - Event triggered upon Adafruit IO feed update + Event triggered when a message is received on a subscribed Adafruit feed. """ def __init__(self, feed, data, *args, **kwargs): diff --git a/platypush/plugins/adafruit/io/__init__.py b/platypush/plugins/adafruit/io/__init__.py index 31dcbfae..23d08de3 100644 --- a/platypush/plugins/adafruit/io/__init__.py +++ b/platypush/plugins/adafruit/io/__init__.py @@ -1,26 +1,30 @@ -import ast import statistics -import json import time -from threading import Thread, Lock +from queue import Empty, Queue +from threading import Thread +from typing import Iterable, Optional, Union -from platypush.context import get_backend -from platypush.plugins import Plugin, action - -data_throttler_lock = None +from platypush.message.event.adafruit import ( + AdafruitConnectedEvent, + AdafruitDisconnectedEvent, + AdafruitFeedUpdateEvent, +) +from platypush.plugins import RunnablePlugin, action -class AdafruitIoPlugin(Plugin): +class AdafruitIoPlugin(RunnablePlugin): """ - This plugin allows you to interact with the Adafruit IO - , a cloud-based message queue and storage. - You can send values to feeds on your Adafruit IO account and read the - values of those feeds as well through any device. + This plugin allows you to interact with `Adafruit IO + `_, a cloud-based message queue and storage. You + can use this plugin to send and receive data to topics connected to your + Adafruit IO account. - Some example usages:: + Some example usages: - # Send the temperature value for a connected sensor to the "temperature" feed + .. code-block:: javascript + + // Send the temperature value for a connected sensor to the "temperature" feed { "type": "request", "action": "adafruit.io.send", @@ -30,7 +34,7 @@ class AdafruitIoPlugin(Plugin): } } - # Receive the most recent temperature value + // Receive the most recent temperature value { "type": "request", "action": "adafruit.io.receive", @@ -38,164 +42,194 @@ class AdafruitIoPlugin(Plugin): "feed": "temperature" } } + """ - _DATA_THROTTLER_QUEUE = 'platypush/adafruit.io' - - def __init__(self, username, key, throttle_seconds=None, **kwargs): + def __init__( + self, + username: str, + key: str, + feeds: Iterable[str] = (), + throttle_interval: Optional[float] = None, + **kwargs + ): """ :param username: Your Adafruit username - :type username: str - :param key: Your Adafruit IO key - :type key: str - - :param throttle_seconds: If set, then instead of sending the values directly over ``send`` the plugin will - first collect all the samples within the specified period and then dispatch them to Adafruit IO. - You may want to set it if you have data sources providing a lot of data points and you don't want to hit - the throttling limitations of Adafruit. - :type throttle_seconds: float + :param feeds: List of feeds to subscribe to. If not set, then the plugin + will not subscribe to any feed unless instructed to do so, neither + it will emit any event. + :param throttle_interval: If set, then instead of sending the values + directly over ``send`` the plugin will first collect all the + samples within the specified period and then dispatch them to + Adafruit IO. You may want to set it if you have data sources + providing a lot of data points and you don't want to hit the + throttling limitations of Adafruit. """ - from Adafruit_IO import Client + from Adafruit_IO import Client, MQTTClient - global data_throttler_lock super().__init__(**kwargs) self._username = username self._key = key + self.feeds = feeds self.aio = Client(username=username, key=key) - self.throttle_seconds = throttle_seconds + self._mqtt_client: Optional[MQTTClient] = None + self.throttle_interval = throttle_interval + self._data_throttler_thread: Optional[Thread] = None + self._data_throttler_queue = Queue() - if not data_throttler_lock: - data_throttler_lock = Lock() + @property + def _mqtt(self): + if not self._mqtt_client: + from Adafruit_IO import MQTTClient - if self.throttle_seconds and not data_throttler_lock.locked(): - self._get_redis() - self.logger.info('Starting Adafruit IO throttler thread') - data_throttler_lock.acquire(False) - self.data_throttler = Thread(target=self._data_throttler()) - self.data_throttler.start() + self._mqtt_client = MQTTClient(self._username, self._key) + self._mqtt_client.on_connect = self._on_connect # type: ignore + self._mqtt_client.on_disconnect = self._on_disconnect # type: ignore + self._mqtt_client.on_message = self._on_message # type: ignore - @staticmethod - def _get_redis(): - from redis import Redis + return self._mqtt_client - redis_args = { - 'host': 'localhost', - } + def _on_connect(self, *_, **__): + assert self._mqtt_client, 'MQTT client not initialized' + for feed in self.feeds: + self._mqtt_client.subscribe(feed) + self._bus.post(AdafruitConnectedEvent()) - redis_backend = get_backend('redis') - if redis_backend: - redis_args = get_backend('redis').redis_args - redis_args['socket_timeout'] = 1 - return Redis(**redis_args) + def _on_disconnect(self, *_, **__): + self._bus.post(AdafruitDisconnectedEvent()) + + def _on_message(self, _, feed, data, *__): + try: + data = float(data) + except (TypeError, ValueError): + pass + + self._bus.post(AdafruitFeedUpdateEvent(feed=feed, data=data)) + + def _subscribe(self, feed: str): + assert self._mqtt_client, 'MQTT client not initialized' + self._mqtt_client.subscribe(feed) + + def _unsubscribe(self, feed: str): + assert self._mqtt_client, 'MQTT client not initialized' + self._mqtt_client.unsubscribe(feed) def _data_throttler(self): - from redis.exceptions import TimeoutError as QueueTimeoutError + from Adafruit_IO import ThrottlingError - def run(): - from Adafruit_IO import ThrottlingError + if not self.throttle_interval: + return - redis = self._get_redis() - last_processed_batch_timestamp = None - data = {} + last_processed_batch_timestamp = None + data = {} - try: - while True: - try: - new_data = ast.literal_eval( - redis.blpop(self._DATA_THROTTLER_QUEUE)[1].decode('utf-8') - ) + try: + while not self.should_stop(): + try: + new_data = self._data_throttler_queue.get(timeout=1) + for key, value in new_data.items(): + data.setdefault(key, []).append(value) + except Empty: + continue - for (key, value) in new_data.items(): - data.setdefault(key, []).append(value) - except QueueTimeoutError: - pass + should_process = data and ( + last_processed_batch_timestamp is None + or time.time() - last_processed_batch_timestamp + >= self.throttle_interval + ) - if data and ( - last_processed_batch_timestamp is None - or time.time() - last_processed_batch_timestamp - >= self.throttle_seconds - ): - last_processed_batch_timestamp = time.time() - self.logger.info('Processing feeds batch for Adafruit IO') + if not should_process: + continue - for (feed, values) in data.items(): - if values: - value = statistics.mean(values) + last_processed_batch_timestamp = time.time() + self.logger.info('Processing feeds batch for Adafruit IO') - try: - self.send(feed, value, enqueue=False) - except ThrottlingError: - self.logger.warning( - 'Adafruit IO throttling threshold hit, taking a nap ' - + 'before retrying' - ) - time.sleep(self.throttle_seconds) + for feed, values in data.items(): + if values: + value = statistics.mean(values) - data = {} - except Exception as e: - self.logger.exception(e) + try: + self.send(feed, value, enqueue=False) + except ThrottlingError: + self.logger.warning( + 'Adafruit IO throttling threshold hit, taking a nap ' + + 'before retrying' + ) + self.wait_stop(self.throttle_interval) - return run + data = {} + except Exception as e: + self.logger.exception(e) @action - def send(self, feed, value, enqueue=True): + def send(self, feed: str, value: Union[int, float, str], enqueue: bool = True): """ - Send a value to an Adafruit IO feed + Send a value to an Adafruit IO feed. - :param feed: Feed name - :type feed: str - - :param value: Value to send - :type value: Numeric or string - - :param enqueue: If throttle_seconds is set, this method by default will append values to the throttling queue + :param feed: Feed name. + :param value: Value to send. + :param enqueue: If throttle_interval is set, this method by default will append values to the throttling queue to be periodically flushed instead of sending the message directly. In such case, pass enqueue=False to override the behaviour and send the message directly instead. - :type enqueue: bool """ - if not self.throttle_seconds or not enqueue: + if not self.throttle_interval or not enqueue: # If no throttling is configured, or enqueue is false then send the value directly to Adafruit self.aio.send(feed, value) else: - # Otherwise send it to the Redis queue to be picked up by the throttler thread - redis = self._get_redis() - redis.rpush(self._DATA_THROTTLER_QUEUE, json.dumps({feed: value})) + # Otherwise send it to the queue to be picked up by the throttler thread + self._data_throttler_queue.put({feed: value}) @action - def send_location_data(self, feed, lat, lon, ele, value): + def send_location_data( + self, + feed: str, + latitude: float, + longitude: float, + elevation: float, + value: Optional[Union[int, float, str]] = None, + ): """ Send location data to an Adafruit IO feed :param feed: Feed name - :type feed: str - :param lat: Latitude - :type lat: float - :param lon: Longitude - :type lon: float - :param ele: Elevation - :type ele: float - - :param value: Value to send - :type value: Numeric or string + :param value: Extra value to attach to the record """ self.aio.send_data( feed=feed, value=value, metadata={ - 'lat': lat, - 'lon': lon, - 'ele': ele, + 'lat': latitude, + 'lon': longitude, + 'ele': elevation, }, ) + @action + def subscribe(self, feed: str): + """ + Subscribe to a feed. + + :param feed: Feed name + """ + self._subscribe(feed) + + @action + def unsubscribe(self, feed: str): + """ + Unsubscribe from a feed. + + :param feed: Feed name + """ + self._unsubscribe(feed) + @classmethod def _cast_value(cls, value): try: @@ -220,17 +254,14 @@ class AdafruitIoPlugin(Plugin): ] @action - def receive(self, feed, limit=1): + def receive(self, feed: str, limit: int = 1): """ Receive data from the specified Adafruit IO feed :param feed: Feed name - :type feed: str - :param limit: Maximum number of data points to be returned. If None, all the values in the feed will be returned. Default: 1 (return most recent value) - :type limit: int """ if limit == 1: @@ -241,42 +272,84 @@ class AdafruitIoPlugin(Plugin): return values[:limit] if limit else values @action - def receive_next(self, feed): + def receive_next(self, feed: str): """ Receive the next unprocessed data point from a feed :param feed: Feed name - :type feed: str """ values = self._convert_data_to_dict(self.aio.receive_next(feed)) return values[0] if values else None @action - def receive_previous(self, feed): + def receive_previous(self, feed: str): """ Receive the last processed data point from a feed :param feed: Feed name - :type feed: str """ values = self._convert_data_to_dict(self.aio.receive_previous(feed)) return values[0] if values else None @action - def delete(self, feed, data_id): + def delete(self, feed: str, data_id: str): """ Delete a data point from a feed :param feed: Feed name - :type feed: str - :param data_id: Data point ID to remove - :type data_id: str """ self.aio.delete(feed, data_id) + def main(self): + if self.throttle_interval: + self._data_throttler_thread = Thread(target=self._data_throttler) + self._data_throttler_thread.start() + + try: + while not self.should_stop(): + cur_wait = 1 + max_wait = 60 + + try: + self._mqtt.connect() + cur_wait = 1 + self._mqtt.loop_blocking() + except Exception as e: + self.logger.warning( + 'Adafruit IO connection error: %s, retrying in %d seconds', + e, + cur_wait, + ) + + self.logger.exception(e) + self.wait_stop(cur_wait) + cur_wait = min(cur_wait * 2, max_wait) + finally: + self._stop_mqtt() + finally: + self._stop_data_throttler() + + def _stop_mqtt(self): + if self._mqtt_client: + self._mqtt_client.disconnect() + self._mqtt_client = None + + def _stop_data_throttler(self): + if self._data_throttler_thread: + self._data_throttler_thread.join() + self._data_throttler_thread = None + + def _stop(self): + self._stop_mqtt() + self._stop_data_throttler() + + def stop(self): + self._stop() + super().stop() + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/adafruit/io/manifest.yaml b/platypush/plugins/adafruit/io/manifest.yaml index b346fa3c..968826cc 100644 --- a/platypush/plugins/adafruit/io/manifest.yaml +++ b/platypush/plugins/adafruit/io/manifest.yaml @@ -1,5 +1,8 @@ manifest: - events: {} + events: + - platypush.message.event.adafruit.AdafruitConnectedEvent + - platypush.message.event.adafruit.AdafruitDisconnectedEvent + - platypush.message.event.adafruit.AdafruitFeedUpdateEvent install: pip: - adafruit-io -- 2.45.1 From d4fb35105bf5e1e177f594895ca664167041af29 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 19 Jan 2024 22:29:29 +0100 Subject: [PATCH 3/4] FIX: Redis pub/sub error can also raise a `ValueError` on close. --- platypush/backend/http/app/mixins/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/platypush/backend/http/app/mixins/__init__.py b/platypush/backend/http/app/mixins/__init__.py index b036e1e0..2c97cb45 100644 --- a/platypush/backend/http/app/mixins/__init__.py +++ b/platypush/backend/http/app/mixins/__init__.py @@ -125,7 +125,7 @@ class PubSubMixin: continue yield Message(data=msg.get('data', b''), channel=channel) - except (AttributeError, RedisConnectionError): + except (AttributeError, ValueError, RedisConnectionError): return def _pubsub_close(self): -- 2.45.1 From 446b10d0050006648d1390d6fd142da885f19d7b Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Sat, 3 Feb 2024 21:16:24 +0100 Subject: [PATCH 4/4] [#348] Major refactor of the `mail` plugins and `mail` backend/plugin merge. --- docs/source/backends.rst | 1 - docs/source/platypush/backend/mail.rst | 5 - docs/source/platypush/plugins/mail.imap.rst | 5 - docs/source/platypush/plugins/mail.rst | 5 + docs/source/platypush/plugins/mail.smtp.rst | 5 - docs/source/plugins.rst | 3 +- platypush/backend/mail/__init__.py | 357 ----- platypush/backend/mail/manifest.yaml | 10 - platypush/config/config.yaml | 67 + platypush/message/event/mail.py | 31 +- platypush/plugins/mail/__init__.py | 1245 ++++++++++++++--- platypush/plugins/mail/_account.py | 106 ++ platypush/plugins/mail/_model/__init__.py | 14 + .../plugins/mail/_model/_config/__init__.py | 8 + .../plugins/mail/_model/_config/_account.py | 43 + .../plugins/mail/_model/_config/_server.py | 22 + platypush/plugins/mail/_model/_mail.py | 254 ++++ platypush/plugins/mail/_model/_transport.py | 25 + platypush/plugins/mail/_plugin/__init__.py | 12 + platypush/plugins/mail/_plugin/_base.py | 130 ++ platypush/plugins/mail/_plugin/_in.py | 125 ++ platypush/plugins/mail/_plugin/_out.py | 138 ++ platypush/plugins/mail/_plugin/_utils.py | 40 + platypush/plugins/mail/_utils.py | 73 + platypush/plugins/mail/imap/__init__.py | 671 ++------- platypush/plugins/mail/imap/manifest.yaml | 7 - platypush/plugins/mail/manifest.yaml | 20 + platypush/plugins/mail/smtp/__init__.py | 115 +- platypush/plugins/mail/smtp/manifest.yaml | 6 - platypush/plugins/mailgun/__init__.py | 58 +- setup.py | 4 +- 31 files changed, 2376 insertions(+), 1229 deletions(-) delete mode 100644 docs/source/platypush/backend/mail.rst delete mode 100644 docs/source/platypush/plugins/mail.imap.rst create mode 100644 docs/source/platypush/plugins/mail.rst delete mode 100644 docs/source/platypush/plugins/mail.smtp.rst delete mode 100644 platypush/backend/mail/__init__.py delete mode 100644 platypush/backend/mail/manifest.yaml create mode 100644 platypush/plugins/mail/_account.py create mode 100644 platypush/plugins/mail/_model/__init__.py create mode 100644 platypush/plugins/mail/_model/_config/__init__.py create mode 100644 platypush/plugins/mail/_model/_config/_account.py create mode 100644 platypush/plugins/mail/_model/_config/_server.py create mode 100644 platypush/plugins/mail/_model/_mail.py create mode 100644 platypush/plugins/mail/_model/_transport.py create mode 100644 platypush/plugins/mail/_plugin/__init__.py create mode 100644 platypush/plugins/mail/_plugin/_base.py create mode 100644 platypush/plugins/mail/_plugin/_in.py create mode 100644 platypush/plugins/mail/_plugin/_out.py create mode 100644 platypush/plugins/mail/_plugin/_utils.py create mode 100644 platypush/plugins/mail/_utils.py delete mode 100644 platypush/plugins/mail/imap/manifest.yaml create mode 100644 platypush/plugins/mail/manifest.yaml delete mode 100644 platypush/plugins/mail/smtp/manifest.yaml diff --git a/docs/source/backends.rst b/docs/source/backends.rst index 1d1612ab..09bd8f5c 100644 --- a/docs/source/backends.rst +++ b/docs/source/backends.rst @@ -11,7 +11,6 @@ Backends platypush/backend/chat.telegram.rst platypush/backend/gps.rst platypush/backend/http.rst - platypush/backend/mail.rst platypush/backend/midi.rst platypush/backend/music.mopidy.rst platypush/backend/music.mpd.rst diff --git a/docs/source/platypush/backend/mail.rst b/docs/source/platypush/backend/mail.rst deleted file mode 100644 index 2ba71fe6..00000000 --- a/docs/source/platypush/backend/mail.rst +++ /dev/null @@ -1,5 +0,0 @@ -``mail`` -========================== - -.. automodule:: platypush.backend.mail - :members: diff --git a/docs/source/platypush/plugins/mail.imap.rst b/docs/source/platypush/plugins/mail.imap.rst deleted file mode 100644 index b9f75f73..00000000 --- a/docs/source/platypush/plugins/mail.imap.rst +++ /dev/null @@ -1,5 +0,0 @@ -``mail.imap`` -=============================== - -.. automodule:: platypush.plugins.mail.imap - :members: diff --git a/docs/source/platypush/plugins/mail.rst b/docs/source/platypush/plugins/mail.rst new file mode 100644 index 00000000..f692733a --- /dev/null +++ b/docs/source/platypush/plugins/mail.rst @@ -0,0 +1,5 @@ +``mail`` +======== + +.. automodule:: platypush.plugins.mail + :members: diff --git a/docs/source/platypush/plugins/mail.smtp.rst b/docs/source/platypush/plugins/mail.smtp.rst deleted file mode 100644 index cdb8a722..00000000 --- a/docs/source/platypush/plugins/mail.smtp.rst +++ /dev/null @@ -1,5 +0,0 @@ -``mail.smtp`` -=============================== - -.. automodule:: platypush.plugins.mail.smtp - :members: diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index 070ff267..4458165b 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -62,8 +62,7 @@ Plugins platypush/plugins/log.http.rst platypush/plugins/logger.rst platypush/plugins/luma.oled.rst - platypush/plugins/mail.imap.rst - platypush/plugins/mail.smtp.rst + platypush/plugins/mail.rst platypush/plugins/mailgun.rst platypush/plugins/mastodon.rst platypush/plugins/matrix.rst diff --git a/platypush/backend/mail/__init__.py b/platypush/backend/mail/__init__.py deleted file mode 100644 index 1616c233..00000000 --- a/platypush/backend/mail/__init__.py +++ /dev/null @@ -1,357 +0,0 @@ -import json -import os -import pathlib - -from dataclasses import dataclass -from datetime import datetime -from queue import Queue, Empty -from threading import Thread, RLock -from typing import List, Dict, Any, Optional, Tuple - -from sqlalchemy import engine, create_engine, Column, Integer, String, DateTime -from sqlalchemy.orm import sessionmaker, scoped_session - -from platypush.backend import Backend -from platypush.common.db import declarative_base -from platypush.config import Config -from platypush.context import get_plugin -from platypush.message.event.mail import ( - MailReceivedEvent, - MailSeenEvent, - MailFlaggedEvent, - MailUnflaggedEvent, -) -from platypush.plugins.mail import MailInPlugin, Mail - -# -Base = declarative_base() -Session = scoped_session(sessionmaker()) - - -class MailboxStatus(Base): - """Models the MailboxStatus table, containing information about the state of a monitored mailbox.""" - - __tablename__ = 'MailboxStatus' - - mailbox_id = Column(Integer, primary_key=True) - unseen_message_ids = Column(String, default='[]') - flagged_message_ids = Column(String, default='[]') - last_checked_date = Column(DateTime) - - -# - - -# -@dataclass -class Mailbox: - plugin: MailInPlugin - name: str - args: dict - - -# - - -class MailBackend(Backend): - """ - This backend can subscribe to one or multiple mail servers and trigger events when new messages are received or - messages are marked as seen. - - It requires at least one plugin that extends :class:`platypush.plugins.mail.MailInPlugin` (e.g. ``mail.imap``) to - be installed. - """ - - def __init__( - self, - mailboxes: List[Dict[str, Any]], - timeout: Optional[int] = 60, - poll_seconds: Optional[int] = 60, - **kwargs - ): - """ - :param mailboxes: List of mailboxes to be monitored. Each mailbox entry contains a ``plugin`` attribute to - identify the :class:`platypush.plugins.mail.MailInPlugin` plugin that will be used (e.g. ``mail.imap``) - and the arguments that will be passed to :meth:`platypush.plugins.mail.MailInPlugin.search_unseen_messages`. - The ``name`` parameter can be used to identify this mailbox in the relevant events, otherwise - ``Mailbox #{id}`` will be used as a name. Example configuration: - - .. code-block:: yaml - - backend.mail: - mailboxes: - - plugin: mail.imap - name: "My Local Server" - username: me@mydomain.com - password: my-imap-password - server: localhost - ssl: true - folder: "All Mail" - - - plugin: mail.imap - name: "GMail" - username: me@gmail.com - password: my-google-password - server: imap.gmail.com - ssl: true - folder: "INBOX" - - If you have a default configuration available for a mail plugin you can implicitly reuse it without - replicating it here. Example: - - .. code-block:: yaml - - mail.imap: - username: me@mydomain.com - password: my-imap-password - server: localhost - ssl: true - - backend.mail: - mailboxes: - # The mail.imap default configuration will be used - - plugin: mail.imap - name: "My Local Server" - folder: "All Mail" - - :param poll_seconds: How often the backend should check the mail (default: 60). - :param timeout: Connect/read timeout for a mailbox, in seconds (default: 60). - """ - self.logger.info('Initializing mail backend') - - super().__init__(**kwargs) - self.poll_seconds = poll_seconds - self.mailboxes: List[Mailbox] = [] - self.timeout = timeout - self._unread_msgs: List[Dict[int, Mail]] = [{}] * len(mailboxes) - self._flagged_msgs: List[Dict[int, Mail]] = [{}] * len(mailboxes) - self._db_lock = RLock() - self.workdir = os.path.join(os.path.expanduser(Config.get('workdir')), 'mail') - self.dbfile = os.path.join(self.workdir, 'backend.db') - - # Parse mailboxes - for i, mbox in enumerate(mailboxes): - assert ( - 'plugin' in mbox - ), 'No plugin attribute specified for mailbox n.{}'.format(i) - plugin = get_plugin(mbox.pop('plugin')) - assert isinstance(plugin, MailInPlugin), '{} is not a MailInPlugin'.format( - plugin - ) - name = mbox.pop('name') if 'name' in mbox else 'Mailbox #{}'.format(i + 1) - self.mailboxes.append(Mailbox(plugin=plugin, name=name, args=mbox)) - - # Configure/sync db - pathlib.Path(self.workdir).mkdir(parents=True, exist_ok=True, mode=0o750) - self._db = self._db_get_engine() - Base.metadata.create_all(self._db) - Session.configure(bind=self._db) - self._db_load_mailboxes_status() - self.logger.info('Mail backend initialized') - - # - def _db_get_engine(self) -> engine.Engine: - return create_engine( - 'sqlite:///{}'.format(self.dbfile), - connect_args={'check_same_thread': False}, - ) - - def _db_load_mailboxes_status(self) -> None: - mailbox_ids = list(range(len(self.mailboxes))) - - with self._db_lock: - session = Session() - records = { - record.mailbox_id: record - for record in session.query(MailboxStatus) - .filter(MailboxStatus.mailbox_id.in_(mailbox_ids)) - .all() - } - - for mbox_id, _ in enumerate(self.mailboxes): - if mbox_id not in records: - record = MailboxStatus( - mailbox_id=mbox_id, - unseen_message_ids='[]', - flagged_message_ids='[]', - ) - session.add(record) - else: - record = records[mbox_id] - - unseen_msg_ids = json.loads(record.unseen_message_ids or '[]') - flagged_msg_ids = json.loads(record.flagged_message_ids or '[]') - self._unread_msgs[mbox_id] = {msg_id: {} for msg_id in unseen_msg_ids} - self._flagged_msgs[mbox_id] = {msg_id: {} for msg_id in flagged_msg_ids} - - session.commit() - - def _db_get_mailbox_status( - self, mailbox_ids: List[int] - ) -> Dict[int, MailboxStatus]: - with self._db_lock: - session = Session() - return { - record.mailbox_id: record - for record in session.query(MailboxStatus) - .filter(MailboxStatus.mailbox_id.in_(mailbox_ids)) - .all() - } - - # - - # - @staticmethod - def _check_thread( - unread_queue: Queue, flagged_queue: Queue, plugin: MailInPlugin, **args - ): - def thread(): - # noinspection PyUnresolvedReferences - unread = plugin.search_unseen_messages(**args).output - unread_queue.put({msg.id: msg for msg in unread}) - - # noinspection PyUnresolvedReferences - flagged = plugin.search_flagged_messages(**args).output - flagged_queue.put({msg.id: msg for msg in flagged}) - - return thread - - def _get_unread_seen_msgs( - self, mailbox_idx: int, unread_msgs: Dict[int, Mail] - ) -> Tuple[Dict[int, Mail], Dict[int, Mail]]: - prev_unread_msgs = self._unread_msgs[mailbox_idx] - - return { - msg_id: unread_msgs[msg_id] - for msg_id in unread_msgs - if msg_id not in prev_unread_msgs - }, { - msg_id: prev_unread_msgs[msg_id] - for msg_id in prev_unread_msgs - if msg_id not in unread_msgs - } - - def _get_flagged_unflagged_msgs( - self, mailbox_idx: int, flagged_msgs: Dict[int, Mail] - ) -> Tuple[Dict[int, Mail], Dict[int, Mail]]: - prev_flagged_msgs = self._flagged_msgs[mailbox_idx] - - return { - msg_id: flagged_msgs[msg_id] - for msg_id in flagged_msgs - if msg_id not in prev_flagged_msgs - }, { - msg_id: prev_flagged_msgs[msg_id] - for msg_id in prev_flagged_msgs - if msg_id not in flagged_msgs - } - - def _process_msg_events( - self, - mailbox_id: int, - unread: List[Mail], - seen: List[Mail], - flagged: List[Mail], - unflagged: List[Mail], - last_checked_date: Optional[datetime] = None, - ): - for msg in unread: - if msg.date and last_checked_date and msg.date < last_checked_date: - continue - self.bus.post( - MailReceivedEvent(mailbox=self.mailboxes[mailbox_id].name, message=msg) - ) - - for msg in seen: - self.bus.post( - MailSeenEvent(mailbox=self.mailboxes[mailbox_id].name, message=msg) - ) - - for msg in flagged: - self.bus.post( - MailFlaggedEvent(mailbox=self.mailboxes[mailbox_id].name, message=msg) - ) - - for msg in unflagged: - self.bus.post( - MailUnflaggedEvent(mailbox=self.mailboxes[mailbox_id].name, message=msg) - ) - - def _check_mailboxes(self) -> List[Tuple[Dict[int, Mail], Dict[int, Mail]]]: - workers = [] - queues: List[Tuple[Queue, Queue]] = [] - results = [] - - for mbox in self.mailboxes: - unread_queue, flagged_queue = [Queue()] * 2 - worker = Thread( - target=self._check_thread( - unread_queue=unread_queue, - flagged_queue=flagged_queue, - plugin=mbox.plugin, - **mbox.args - ) - ) - worker.start() - workers.append(worker) - queues.append((unread_queue, flagged_queue)) - - for worker in workers: - worker.join(timeout=self.timeout) - - for i, (unread_queue, flagged_queue) in enumerate(queues): - try: - unread = unread_queue.get(timeout=self.timeout) - flagged = flagged_queue.get(timeout=self.timeout) - results.append((unread, flagged)) - except Empty: - self.logger.warning( - 'Checks on mailbox #{} timed out after {} seconds'.format( - i + 1, self.timeout - ) - ) - continue - - return results - - # - - # - def loop(self): - records = [] - mailbox_statuses = self._db_get_mailbox_status(list(range(len(self.mailboxes)))) - results = self._check_mailboxes() - - for i, (unread, flagged) in enumerate(results): - unread_msgs, seen_msgs = self._get_unread_seen_msgs(i, unread) - flagged_msgs, unflagged_msgs = self._get_flagged_unflagged_msgs(i, flagged) - self._process_msg_events( - i, - unread=list(unread_msgs.values()), - seen=list(seen_msgs.values()), - flagged=list(flagged_msgs.values()), - unflagged=list(unflagged_msgs.values()), - last_checked_date=mailbox_statuses[i].last_checked_date, - ) - - self._unread_msgs[i] = unread - self._flagged_msgs[i] = flagged - records.append( - MailboxStatus( - mailbox_id=i, - unseen_message_ids=json.dumps(list(unread.keys())), - flagged_message_ids=json.dumps(list(flagged.keys())), - last_checked_date=datetime.now(), - ) - ) - - with self._db_lock: - session = Session() - for record in records: - session.merge(record) - session.commit() - - # - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/mail/manifest.yaml b/platypush/backend/mail/manifest.yaml deleted file mode 100644 index a471c664..00000000 --- a/platypush/backend/mail/manifest.yaml +++ /dev/null @@ -1,10 +0,0 @@ -manifest: - events: - platypush.message.event.mail.MailFlaggedEvent: when a message is marked as flagged/starred. - platypush.message.event.mail.MailReceivedEvent: when a new message is received. - platypush.message.event.mail.MailSeenEvent: when a message is marked as seen. - platypush.message.event.mail.MailUnflaggedEvent: when a message is marked as unflagged/unstarred. - install: - pip: [] - package: platypush.backend.mail - type: backend diff --git a/platypush/config/config.yaml b/platypush/config/config.yaml index 263e54ad..44a74d8e 100644 --- a/platypush/config/config.yaml +++ b/platypush/config/config.yaml @@ -799,6 +799,73 @@ backend.http: # proximity: 10.0 ### +### ----------------------------------------- +### Example configuration of the mail plugin. +### ----------------------------------------- +# +# mail: +# # Display name to be used for outgoing emails. Default: +# # the `from` parameter will be used from :meth:`.send`, +# # and, if missing, the username from the account configuration +# # will be used. +# display_name: My Name +# +# # How often we should poll for updates (default: 60 seconds) +# poll_interval: 60 +# +# # Connection timeout (default: 20 seconds) +# # Can be overridden on a per-account basis +# timeout: 20 +# +# accounts: +# - name: "My Local Account" +# username: me@mydomain.com +# password: my-password +# +# # The default flag sets this account as the default one +# # for mail retrieval and sending if no account is +# # specified on an action. If multiple accounts are set +# # and none is set as default, and no account is specified +# # on an action, then the first configured account will be +# # used. +# default: true +# # Domain to be used for outgoing emails. Default: inferred +# # from the account configuration +# domain: example.com +# +# +# # Alternatively, you can run an external command +# # to get the password +# # password_cmd: "pass show mail/example.com" +# +# # Path to a custom certfile if the mail server uses a +# # self-signed certificate +# # certfile: /path/to/certfile +# +# # Path to a custom keyfile if the mail server requires +# # client authentication. It requires certfile to be set +# # too +# # keyfile: /path/to/keyfile +# +# incoming: +# # Supported protocols: imap, imaps +# server: imaps://mail.example.com:993 +# +# outgoing: +# # The `incoming` and `outgoing` configurations can +# # override the global `username` and `password` and +# # other authentication parameters of the account +# username: me +# password: my-smtp-password +# +# # Supported protocols: smtp, smtps, smtp+starttls, +# server: smtps://mail.example.com:465 +# +# # These folders will be monitored for new messages +# monitor_folders: +# - All Mail +### + ### -------------------------------- ### Some text-to-speech integrations ### -------------------------------- diff --git a/platypush/message/event/mail.py b/platypush/message/event/mail.py index b4268f6a..533d4245 100644 --- a/platypush/message/event/mail.py +++ b/platypush/message/event/mail.py @@ -1,40 +1,39 @@ -from typing import Optional - from platypush.message.event import Event -from platypush.plugins.mail import Mail class MailEvent(Event): - def __init__(self, mailbox: str, message: Optional[Mail] = None, *args, **kwargs): - super().__init__(*args, mailbox=mailbox, message=message or {}, **kwargs) - - -class MailReceivedEvent(MailEvent): """ - Triggered when a new email is received. + Base class for mail events. """ - pass + + def __init__(self, *args, account: str, folder: str, message, **kwargs): + super().__init__( + *args, account=account, folder=folder, message=message, **kwargs + ) -class MailSeenEvent(MailEvent): +class UnseenMailEvent(MailEvent): + """ + Triggered when a new email is received or marked as unseen. + """ + + +class SeenMailEvent(MailEvent): """ Triggered when a previously unseen email is seen. """ - pass -class MailFlaggedEvent(MailEvent): +class FlaggedMailEvent(MailEvent): """ Triggered when a message is marked as flagged/starred. """ - pass -class MailUnflaggedEvent(MailEvent): +class UnflaggedMailEvent(MailEvent): """ Triggered when a message previously marked as flagged/starred is unflagged. """ - pass # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/mail/__init__.py b/platypush/plugins/mail/__init__.py index 599da2f2..195058ce 100644 --- a/platypush/plugins/mail/__init__.py +++ b/platypush/plugins/mail/__init__.py @@ -1,225 +1,977 @@ -import inspect import os -import subprocess +import pathlib +import time -from dataclasses import dataclass -from datetime import datetime -from email.message import Message -from email.mime.application import MIMEApplication -from email.mime.audio import MIMEAudio -from email.mime.base import MIMEBase -from email.mime.image import MIMEImage -from email.mime.multipart import MIMEMultipart -from email.mime.text import MIMEText -from mimetypes import guess_type -from typing import Optional, List, Union, Any, Dict +from collections import defaultdict +from threading import Thread, RLock +from typing import Any, Dict, List, Optional, Union, Collection -from platypush.message import JSONAble -from platypush.plugins import Plugin, action +from platypush.config import Config +from platypush.message.event.mail import ( + FlaggedMailEvent, + SeenMailEvent, + UnflaggedMailEvent, + UnseenMailEvent, +) +from platypush.plugins import RunnablePlugin, action + +from ._account import Account +from ._model import FolderStatus, Mail, MailFlagType, AccountsStatus +from ._plugin import MailInPlugin, MailOutPlugin + +AccountType = Union[str, int, Account] +AccountFolderChanges = Dict[str, Dict[MailFlagType, Dict[int, bool]]] -@dataclass -class ServerInfo: - server: str - port: int - username: Optional[str] - password: Optional[str] - ssl: bool - keyfile: Optional[str] - certfile: Optional[str] - access_token: Optional[str] - oauth_mechanism: Optional[str] - oauth_vendor: Optional[str] - timeout: Optional[int] +class MailPlugin(RunnablePlugin): + """ + Plugin to: + - Monitor one or more mailboxes, and emit events when new messages are + received, seen, flagged or unflagged. -class Mail(JSONAble): - def __init__(self, id: int, date: datetime, size: int, - from_: Optional[Union[Dict[str, str], List[str]]] = None, - to: Optional[Union[Dict[str, str], List[str]]] = None, - cc: Optional[Union[Dict[str, str], List[str]]] = None, - bcc: Optional[Union[Dict[str, str], List[str]]] = None, subject: str = '', - payload: Optional[Any] = None, **kwargs): - self.id = id - self.date = date - self.size = size - self.from_ = from_ or kwargs.get('from') - self.to = to - self.cc = cc or [] - self.bcc = bcc or [] - self.subject = subject - self.payload = payload + - Send mail messages. - for k, v in kwargs.items(): - setattr(self, k, v) + - Search for messages in a mailbox. - def to_json(self) -> dict: - return { - k if k != 'from_' else 'from': v - for k, v in dict(inspect.getmembers(self)).items() - if not k.startswith('_') and not callable(v) + """ + + def __init__( + self, + accounts: List[Dict[str, Any]], + timeout: float = 20.0, + poll_interval: float = 60.0, + **kwargs, + ): + """ + Example accounts configuration: + + .. code-block:: yaml + + mail: + # Display name to be used for outgoing emails. Default: + # the `from` parameter will be used from :meth:`.send`, + # and, if missing, the username from the account configuration + # will be used. + display_name: My Name + + # How often we should poll for updates (default: 60 seconds) + poll_interval: 60 + + # Connection timeout (default: 20 seconds) + # Can be overridden on a per-account basis + timeout: 20 + + # Domain to be used for outgoing emails. Default: inferred + # from the account configuration + domain: example.com + + accounts: + - name: "My Local Account" + username: me@mydomain.com + password: my-password + + # The default flag sets this account as the default one + # for mail retrieval and sending if no account is + # specified on an action. If multiple accounts are set + # and none is set as default, and no account is specified + # on an action, then the first configured account will be + # used. + default: true + + # Alternatively, you can run an external command + # to get the password + # password_cmd: "pass show mail/example.com" + + # Path to a custom certfile if the mail server uses a + # self-signed certificate + # certfile: /path/to/certfile + + # Path to a custom keyfile if the mail server requires + # client authentication. It requires certfile to be set + # too + # keyfile: /path/to/keyfile + + incoming: + # Supported protocols: imap, imaps + server: imaps://mail.example.com:993 + + outgoing: + # The `incoming` and `outgoing` configurations can + # override the global `username` and `password` and + # other authentication parameters of the account + username: me + password: my-smtp-password + + # Supported protocols: smtp, smtps, smtp+starttls, + server: smtps://mail.example.com:465 + + # These folders will be monitored for new messages + monitor_folders: + - All Mail + + - name: "GMail" + username: me@gmail.com + + # Access token, if the mail server supports OAuth2 + # access_token: my-access-token + + # OAuth mechanism, if the mail server supports OAuth2 + # (default: XOAUTH2) + # oauth_mechanism: XOAUTH2 + + # OAuth vendor, if the mail server supports OAuth2 + # oauth_vendor: GOOGLE + + incoming: + # Defaults to port 993 for IMAPS if no port is + # specified on the URL + server: imaps://imap.gmail.com + + outgoing: + # Defaults to port 465 for SMTPS if no port is + # specified on the URL + server: smtps://smtp.gmail.com + + monitor_folders: + - INBOX + - Sent + + :param accounts: List of available mailboxes/accounts. + :param poll_interval: How often the plugin should poll for new messages + (default: 60 seconds). + :param timeout: Timeout for the mail server connection (default: 20 + seconds). + """ + assert accounts, 'No mail accounts configured' + + super().__init__(poll_interval=poll_interval, **kwargs) + self.timeout = timeout + self.accounts = self._parse_accounts(accounts) + self._accounts_by_name = {acc.name: acc for acc in self.accounts} + self._default_account = next( + (acc for acc in self.accounts if acc.default), self.accounts[0] + ) + + self._status = AccountsStatus() + self._db_lock = RLock() + self.workdir = os.path.join(Config.get_workdir(), 'mail') + self._status_file = os.path.join(self.workdir, 'status.json') + + # Configure/sync db + pathlib.Path(self.workdir).mkdir(parents=True, exist_ok=True, mode=0o750) + self._load_status() + + def _load_status(self): + with self._db_lock: + try: + with open(self._status_file) as f: + self._status = AccountsStatus.read(f) + except FileNotFoundError: + self._status = AccountsStatus() + except Exception as e: + self.logger.warning( + 'Could not load mail status from %s: %s', self._status_file, e + ) + self._status = AccountsStatus() + + def _get_account(self, account: Optional[AccountType] = None) -> Account: + if isinstance(account, Account): + return account + + if isinstance(account, int): + account -= 1 + assert ( + 0 <= account < len(self.accounts) + ), f'Invalid account index {account} (valid range: 1-{len(self.accounts)})' + + return self.accounts[account] + + if isinstance(account, str): + acc = self._accounts_by_name.get(account) + assert acc, f'No account found with name "{account}"' + return acc + + return self._default_account + + def _get_in_plugin(self, account: Optional[AccountType] = None) -> MailInPlugin: + acc = self._get_account(account) + assert acc.incoming, f'No incoming configuration found for account "{acc.name}"' + return acc.incoming + + def _get_out_plugin(self, account: Optional[AccountType] = None) -> MailOutPlugin: + acc = self._get_account(account) + assert acc.outgoing, f'No outgoing configuration found for account "{acc.name}"' + return acc.outgoing + + def _parse_accounts(self, accounts: List[Dict[str, Any]]) -> List[Account]: + ret = [] + for i, acc in enumerate(accounts): + idx = i + 1 + name = acc.pop('name') if 'name' in acc else f'Account #{idx}' + incoming_conf = acc.pop('incoming') + outgoing_conf = acc.pop('outgoing') + monitor_folders = acc.pop('monitor_folders', []) + + assert ( + incoming_conf or outgoing_conf + ), f'No incoming/outgoing configuration specified for account "{name}"' + + if monitor_folders: + assert incoming_conf, ( + f'Cannot monitor folders for account "{name}" ' + 'without incoming configuration' + ) + + acc['poll_interval'] = self.poll_interval + acc['timeout'] = acc.get('timeout', self.timeout) + ret.append( + Account.build( + name=name, + incoming=incoming_conf, + outgoing=outgoing_conf, + monitor_folders=monitor_folders, + **acc, + ) + ) + + return ret + + @property + def _monitored_accounts(self): + return [acc for acc in self.accounts if acc.monitor_folders] + + @property + def _account_by_name(self) -> Dict[str, Account]: + return {acc.name: acc for acc in self.accounts} + + @staticmethod + def _check_thread(plugin: MailInPlugin, folder: str, results: FolderStatus): + results[MailFlagType.UNREAD] = { + msg.id: msg for msg in plugin.search_unseen_messages(folder=folder) } + results[MailFlagType.FLAGGED] = { + msg.id: msg for msg in plugin.search_flagged_messages(folder=folder) + } -class MailPlugin(Plugin): - """ - Base class for mail plugins. - """ + def _check_mailboxes(self) -> AccountsStatus: + # Workers indexed by (account_name, folder) -> thread + workers = {} + status = AccountsStatus() - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.server_info: Optional[ServerInfo] = None + for account in self._monitored_accounts: + for folder in account.monitor_folders or []: + worker = Thread( + target=self._check_thread, + name=f'check-mailbox-{account.name}-{folder}', + kwargs={ + 'plugin': account.incoming, + 'results': status[account.name][folder], + 'folder': folder, + }, + ) + + worker.start() + workers[account.name, folder] = worker + + wait_start = time.time() + for worker_key, worker in workers.items(): + account = self._account_by_name[worker_key[0]] + if not account.incoming: + continue + + # The timeout should be the time elapsed since wait_start + the configured timeout + timeout = max( + 0, account.incoming.server.timeout - (time.time() - wait_start) + ) + worker.join(timeout=timeout) + if worker.is_alive(): + self.logger.warning('Timeout while polling account %s', account.name) + + return status + + @action + def get_folders( + self, + folder: str = '', + pattern: str = '*', + account: Optional[AccountType] = None, + ) -> List[Dict[str, str]]: + r""" + Get the list of all the folders hosted on the server or those matching a pattern. + + :param folder: Base folder (default: root). + :param pattern: Pattern to search (default: None). + :param account: Account name or index (default: default account). + :return: Example: + + .. code-block:: json + + [ + { + "name": "INBOX", + "flags": "\\Noinferiors", + "delimiter": "/" + }, + { + "name": "Archive", + "flags": "\\Noinferiors", + "delimiter": "/" + }, + { + "name": "Spam", + "flags": "\\Noinferiors", + "delimiter": "/" + } + ] - @staticmethod - def _get_password(password: Optional[str] = None, password_cmd: Optional[str] = None) -> Optional[str]: """ - Get the password either from a provided string or from a password command. + return self._get_in_plugin(account).get_folders(folder=folder, pattern=pattern) + + @action + def get_sub_folders( + self, + folder: str = '', + pattern: str = '*', + account: Optional[AccountType] = None, + ) -> List[Dict[str, str]]: + r""" + Get the list of all the sub-folders hosted on the server or those matching a pattern. + + :param folder: Base folder (default: root). + :param pattern: Pattern to search (default: None). + :param account: Account name or index (default: default account). + :return: Example: + + .. code-block:: json + + [ + { + "name": "INBOX", + "flags": "\\Noinferiors", + "delimiter": "/" + }, + { + "name": "Archive", + "flags": "\\Noinferiors", + "delimiter": "/" + }, + { + "name": "Spam", + "flags": "\\Noinferiors", + "delimiter": "/" + } + ] + """ - if not password_cmd: - return password - - proc = subprocess.Popen(['sh', '-c', password_cmd], stdout=subprocess.PIPE) - password = proc.communicate()[0].decode() - return password or None - - @staticmethod - def _get_path(path: str) -> str: - return os.path.abspath(os.path.expanduser(path)) - - def _get_server_info(self, server: Optional[str] = None, port: Optional[int] = None, username: Optional[str] = None, - password: Optional[str] = None, password_cmd: Optional[str] = None, - ssl: Optional[bool] = None, keyfile: Optional[str] = None, certfile: Optional[str] = None, - access_token: Optional[str] = None, oauth_mechanism: Optional[str] = None, - oauth_vendor: Optional[str] = None, default_port: Optional[int] = None, - default_ssl_port: Optional[int] = None, timeout: Optional[int] = None, **kwargs) \ - -> ServerInfo: - if not port: - port = default_ssl_port if ssl else default_port - - info = ServerInfo(server=server, port=port, username=username, - password=self._get_password(password, password_cmd), ssl=ssl, keyfile=keyfile, - certfile=certfile, access_token=access_token, oauth_mechanism=oauth_mechanism, - oauth_vendor=oauth_vendor, timeout=timeout) - - if server: - return info - - if self.server_info: - assert self.server_info.server, 'No server specified' - return self.server_info - - return info - - -class MailInPlugin(MailPlugin): - """ - Base class for mail in plugins. - """ + return self._get_in_plugin(account).get_sub_folders( + folder=folder, pattern=pattern + ) @action - def get_folders(self) -> list: - raise NotImplementedError() + def search( + self, + criteria: Union[str, List[str]] = 'ALL', + folder: str = 'INBOX', + attributes: Optional[List[str]] = None, + account: Optional[AccountType] = None, + ) -> List[Mail]: + """ + Search for messages on the server that fit the specified criteria. + + If no criteria is specified, then all the messages in the folder will + be returned. + + :param criteria: It should be a sequence of one or more criteria items. + Each criterion item may be either unicode or bytes (default: + ``ALL``). Example values:: + + ['UNSEEN'] + ['FROM', 'me@example.com'] + ['TO', 'me@example.com'] + ['SMALLER', 500] + ['NOT', 'DELETED'] + ['TEXT', 'foo bar', 'FLAGGED', 'SUBJECT', 'baz'] + ['SINCE', '2020-03-14T12:13:45+00:00'] + + It is also possible (but not recommended) to pass the combined + criteria as a single string. In this case IMAPClient won't perform + quoting, allowing lower-level specification of criteria. Examples + of this style:: + + 'UNSEEN' + 'SMALLER 500' + 'NOT DELETED' + 'TEXT "foo bar" FLAGGED SUBJECT "baz"' + 'SINCE 03-Apr-2005' + + To support complex search expressions, criteria lists can be + nested. The following will match messages that are both not flagged + and do not have "foo" in the subject:: + + ['NOT', ['SUBJECT', 'foo', 'FLAGGED']] + + See :rfc:`3501#section-6.4.4` for more details. + + :param folder: Folder to search (default: ``INBOX``). + :param attributes: Attributes that should be retrieved, according to + `RFC 3501 `_ + (default: ``ALL`` = ``[FLAGS INTERNALDATE RFC822.SIZE ENVELOPE]``). + Note that ``BODY`` will be ignored if specified here for + performance reasons - use :meth:`.get_message` if you want to get + the full content of a message known its ID from :meth:`.search`. + + :param account: Account name or index (default: default account). + :return: List of messages matching the criteria. Example: + + .. code-block:: json + + [ + { + "id": 702, + "seq": 671, + "flags": [ + "nonjunk" + ], + "internal_date": "2020-08-30T00:31:52+00:00", + "size": 2908738, + "bcc": {}, + "cc": {}, + "date": "2020-08-30T00:31:52+00:00", + "from": { + "test123@gmail.com": { + "name": "A test", + "route": null, + "email": "test123@gmail.com" + } + }, + "message_id": "", + "in_reply_to": "", + "reply_to": {}, + "sender": { + "test123@gmail.com": { + "name": "A test", + "route": null, + "email": "test123@gmail.com" + } + }, + "subject": "Test email", + "to": { + "me@gmail.com": { + "name": null, + "route": null, + "email": "me@gmail.com" + } + } + } + ] + + """ + return self._get_in_plugin(account).search( + criteria=criteria, folder=folder, attributes=attributes + ) @action - def get_sub_folders(self) -> list: - raise NotImplementedError() + def search_unseen_messages( + self, folder: str = 'INBOX', account: Optional[AccountType] = None + ) -> List[Mail]: + """ + Shortcut for :meth:`.search` that returns only the unread messages. + """ + return self._get_in_plugin(account).search_unseen_messages(folder=folder) @action - def search(self, criteria: str, directory: Optional[str] = None) -> list: - raise NotImplementedError() + def search_flagged_messages( + self, folder: str = 'INBOX', account: Optional[AccountType] = None + ) -> List[Mail]: + """ + Shortcut for :meth:`.search` that returns only the flagged/starred messages. + """ + return self._get_in_plugin(account).search_flagged_messages(folder=folder) @action - def search_unseen_messages(self, directory: Optional[str] = None) -> list: - raise NotImplementedError() - - def search_flagged_messages(self, folder: str = 'INBOX', **connect_args) -> list: - raise NotImplementedError() + def search_starred_messages( + self, folder: str = 'INBOX', account: Optional[AccountType] = None + ) -> List[Mail]: + """ + Shortcut for :meth:`.search` that returns only the starred messages. + """ + return self._get_in_plugin(account).search_starred_messages(folder=folder) @action - def get_message(self, id) -> dict: - raise NotImplementedError() + def sort( + self, + folder: str = 'INBOX', + sort_criteria: Union[str, List[str]] = 'ARRIVAL', + criteria: Union[str, List[str]] = 'ALL', + account: Optional[AccountType] = None, + ) -> List[int]: + """ + Return a list of message ids from the currently selected folder, sorted + by ``sort_criteria`` and optionally filtered by ``criteria``. Note that + SORT is an extension to the IMAP4 standard, so it may not be supported + by all IMAP servers. + :param folder: Folder to be searched (default: ``INBOX``). + :param sort_criteria: It may be a sequence of strings or a single + string. IMAPClient will take care any required conversions. Valid + *sort_criteria* values:: -class MailOutPlugin(MailPlugin): - """ - Base class for mail out plugins. - """ + .. code-block:: python - def send_message(self, message: Message, **connect_args): - raise NotImplementedError() + ['ARRIVAL'] + ['SUBJECT', 'ARRIVAL'] + 'ARRIVAL' + 'REVERSE SIZE' - @staticmethod - def _file_to_part(file: str) -> MIMEBase: - _type, _subtype, _type_class = 'application', 'octet-stream', MIMEApplication - mime_type, _sub_subtype = guess_type(file) - - if mime_type: - _type, _subtype = mime_type.split('/') - if _sub_subtype: - _subtype += ';' + _sub_subtype - - if _type == 'application': - _type_class = MIMEApplication - elif _type == 'audio': - _type_class = MIMEAudio - elif _type == 'image': - _type_class = MIMEImage - elif _type == 'text': - _type_class = MIMEText - - args = {} - if _type_class != MIMEText: - mode = 'rb' - args['Name'] = os.path.basename(file) - else: - mode = 'r' - - with open(file, mode) as f: - return _type_class(f.read(), _subtype, **args) - - @classmethod - def create_message(cls, to: Union[str, List[str]], from_: Optional[str] = None, - cc: Optional[Union[str, List[str]]] = None, bcc: Optional[Union[str, List[str]]] = None, - subject: str = '', body: str = '', body_type: str = 'plain', - attachments: Optional[List[str]] = None, headers: Optional[Dict[str, str]] = None) -> Message: - assert from_, 'from/from_ field not specified' - - body = MIMEText(body, body_type) - if attachments: - msg = MIMEMultipart() - msg.attach(body) - - for attachment in attachments: - attachment = os.path.abspath(os.path.expanduser(attachment)) - assert os.path.isfile(attachment), 'No such file: {}'.format(attachment) - part = cls._file_to_part(attachment) - part['Content-Disposition'] = 'attachment; filename="{}"'.format(os.path.basename(attachment)) - msg.attach(part) - else: - msg = body - - msg['From'] = from_ - msg['To'] = ', '.join(to) if isinstance(to, List) else to - msg['Cc'] = ', '.join(cc) if cc else '' - msg['Bcc'] = ', '.join(bcc) if bcc else '' - msg['Subject'] = subject - - if headers: - for name, value in headers.items(): - msg.add_header(name, value) - - return msg + :param criteria: Optional filter for the messages, as specified in + :meth:`.search`. + :param account: Account name or index (default: default account). + :return: A list of message IDs that fit the criteria. + """ + return self._get_in_plugin(account).sort( + folder=folder, sort_criteria=sort_criteria, criteria=criteria + ) @action - def send(self, to: Union[str, List[str]], from_: Optional[str] = None, - cc: Optional[Union[str, List[str]]] = None, bcc: Optional[Union[str, List[str]]] = None, - subject: str = '', body: str = '', body_type: str = 'plain', attachments: Optional[List[str]] = None, - headers: Optional[Dict[str, str]] = None, **connect_args): + def get_message( + self, + id: int, # pylint: disable=redefined-builtin + folder: str = 'INBOX', + account: Optional[AccountType] = None, + with_body: bool = True, + ) -> Mail: + r""" + Get the full content of a message given the ID returned by :meth:`.search`. + + :param id: Message ID. + :param folder: Folder name (default: ``INBOX``). + :param account: Account name or index (default: default account). + :param with_body: If set then the body/payload will be included in the response (default: ``True``). + :return: A message in the same format as :meth:`.search`, with an added + ``payload`` attribute containing the body/payload. Example response: + + .. code-block:: json + + { + "id": 123, + "date": "2024-01-13T21:04:50", + "size": 3833, + "from": { + "you@example.com": { + "name": "Me", + "route": null, + "email": "you@example.com" + } + }, + "to": { + "me@example.com": { + "name": "Me", + "route": null, + "email": "me@example.com" + } + }, + "cc": { + "they@example.com": { + "name": "They", + "route": null, + "email": "they@example.com" + } + }, + "bcc": { + "boss@example.com": { + "name": "Boss", + "route": null, + "email": "boss@example.com" + } + }, + "subject": "Test email", + "content": { + "headers": { + "Return-Path": "", + "Delivered-To": "me@example.com", + "Date": "Sat, 13 Jan 2024 20:04:50 +0000", + "To": "Me ", + "Cc": "They ", + "Bcc": "Boss ", + "Subject": "Test email", + "Message-ID": "<0123456789@client>", + "MIME-Version": "1.0", + "Content-Type": "multipart/mixed;\r\n boundary=\"0123456789\"" + }, + "body": "This is the email body", + "attachments": [ + { + "filename": "signature.asc", + "headers": { + "Content-Type": "application/pgp-signature; name=signature.asc", + "Content-Transfer-Encoding": "base64", + "Content-Disposition": "attachment; filename=signature.asc" + }, + "body": "-----BEGIN PGP SIGNATURE-----\r\n\r\n....\r\n\r\n-----END PGP SIGNATURE-----\r\n" + }, + { + "filename": "image.jpg", + "body": "/9j/4gIcSUNDX1BST0ZJTEUAA...", + "headers": { + "Content-Type": "image/jpeg; Name=\"image.jpg\"", + "MIME-Version": "1.0", + "Content-Transfer-Encoding": "base64", + "Content-Disposition": "attachment; filename=\"profile_pic.jpg\"" + } + } + ] + }, + "seq": 123, + "internal_date": "2024-01-13T21:05:12", + "message_id": "<0123456789@client>", + "reply_to": { + "you@example.com": { + "name": "You", + "route": null, + "email": "you@example.com" + } + }, + "sender": { + "you@example.com": { + "name": "You", + "route": null, + "email": "you@example.com" + } + } + } + + """ + return self._get_in_plugin(account).get_message( + id=id, folder=folder, with_body=with_body + ) + + @action + def get_messages( + self, + ids: Collection[int], + folder: str = 'INBOX', + account: Optional[AccountType] = None, + with_body: bool = True, + ) -> Dict[int, Mail]: + """ + Get the full content of a list of messages given their IDs returned by :meth:`.search`. + + :param ids: IDs of the messages to retrieve. + :param folder: Folder name (default: ``INBOX``). + :param account: Account name or index (default: default account). + :param with_body: If set then the body/payload will be included in the response + (default: ``True``). + :return: A dictionary in the format ``{id -> msg}``, where ``msg`` is in the same + format as :meth:`.search`, with an added ``payload`` attribute containing the + body/payload. See :meth:`.get_message` for an example message format. + """ + return self._get_in_plugin(account).get_messages( + *ids, folder=folder, with_body=with_body + ) + + @action + def create_folder(self, folder: str, account: Optional[AccountType] = None): + """ + Create a folder on the server. + + :param folder: Folder name. + :param account: Account name or index (default: default account). + """ + return self._get_in_plugin(account).create_folder(folder=folder) + + @action + def rename_folder( + self, old_name: str, new_name: str, account: Optional[AccountType] = None + ): + """ + Rename a folder on the server. + + :param old_name: Previous name + :param new_name: New name + :param account: Account name or index (default: default account). + """ + return self._get_in_plugin(account).rename_folder( + old_name=old_name, new_name=new_name + ) + + @action + def delete_folder(self, folder: str, account: Optional[AccountType] = None): + """ + Delete a folder from the server. + + :param folder: Folder name. + :param account: Account name or index (default: default account). + """ + return self._get_in_plugin(account).delete_folder(folder=folder) + + @action + def add_flags( + self, + messages: List[int], + flags: Union[str, List[str]], + folder: str = 'INBOX', + account: Optional[AccountType] = None, + ): + """ + Add a set of flags to the specified set of message IDs. + + :param messages: List of message IDs. + :param flags: List of flags to be added. Examples: + + .. code-block:: python + + ['Flagged'] + ['Seen', 'Deleted'] + ['Junk'] + + :param folder: IMAP folder (default: ``INBOX``). + :param account: Account name or index (default: default account). + """ + return self._get_in_plugin(account).add_flags( + messages=messages, flags=flags, folder=folder + ) + + @action + def set_flags( + self, + messages: List[int], + flags: Union[str, List[str]], + folder: str = 'INBOX', + account: Optional[AccountType] = None, + ): + """ + Set a set of flags to the specified set of message IDs. + + :param messages: List of message IDs. + :param flags: List of flags to be added. Examples: + + .. code-block:: python + + ['Flagged'] + ['Seen', 'Deleted'] + ['Junk'] + + :param folder: IMAP folder (default: ``INBOX``). + :param account: Account name or index (default: default account). + """ + return self._get_in_plugin(account).set_flags( + messages=messages, flags=flags, folder=folder + ) + + @action + def remove_flags( + self, + messages: List[int], + flags: Union[str, List[str]], + folder: str = 'INBOX', + account: Optional[AccountType] = None, + ): + """ + Remove a set of flags to the specified set of message IDs. + + :param messages: List of message IDs. + :param flags: List of flags to be added. Examples: + + .. code-block:: python + + ['Flagged'] + ['Seen', 'Deleted'] + ['Junk'] + + :param folder: IMAP folder (default: ``INBOX``). + :param account: Account name or index (default: default account). + """ + return self._get_in_plugin(account).remove_flags( + messages=messages, flags=flags, folder=folder + ) + + @action + def flag_messages( + self, + messages: List[int], + folder: str = 'INBOX', + account: Optional[AccountType] = None, + ): + """ + Add a flag/star to the specified set of message IDs. + + :param messages: List of message IDs. + :param folder: IMAP folder (default: ``INBOX``). + :param account: Account name or index (default: default account). + """ + return self._get_in_plugin(account).flag_messages( + messages=messages, folder=folder + ) + + @action + def unflag_messages( + self, + messages: List[int], + folder: str = 'INBOX', + account: Optional[AccountType] = None, + ): + """ + Remove a flag/star from the specified set of message IDs. + + :param messages: List of message IDs. + :param folder: IMAP folder (default: ``INBOX``). + :param account: Account name or index (default: default account). + """ + return self._get_in_plugin(account).unflag_messages( + messages=messages, folder=folder + ) + + @action + def flag_message( + self, message: int, folder: str = 'INBOX', account: Optional[AccountType] = None + ): + """ + Add a flag/star to the specified set of message ID. + + :param message: Message ID. + :param folder: IMAP folder (default: ``INBOX``). + :param account: Account name or index (default: default account). + """ + return self._get_in_plugin(account).flag_message(message=message, folder=folder) + + @action + def unflag_message( + self, message: int, folder: str = 'INBOX', account: Optional[AccountType] = None + ): + """ + Remove a flag/star from the specified set of message ID. + + :param message: Message ID. + :param folder: IMAP folder (default: ``INBOX``). + :param account: Account name or index (default: default account). + """ + return self._get_in_plugin(account).unflag_message( + message=message, folder=folder + ) + + @action + def delete_messages( + self, + messages: List[int], + folder: str = 'INBOX', + expunge: bool = True, + account: Optional[AccountType] = None, + ): + """ + Set a specified set of message IDs as deleted. + + :param messages: List of message IDs. + :param folder: IMAP folder (default: ``INBOX``). + :param expunge: If set then the messages will also be expunged from the + folder, otherwise they will only be marked as deleted (default: + ``True``). + :param account: Account name or index (default: default account). + """ + return self._get_in_plugin(account).delete_messages( + messages=messages, folder=folder, expunge=expunge + ) + + @action + def restore_messages( + self, + messages: List[int], + folder: str = 'INBOX', + account: Optional[AccountType] = None, + ): + """ + Remove the ``Deleted`` flag from the specified set of message IDs. + + :param messages: List of message IDs. + :param folder: IMAP folder (default: ``INBOX``). + :param account: Account name or index (default: default account). + """ + return self._get_in_plugin(account).restore_messages( + messages=messages, folder=folder + ) + + @action + def copy_messages( + self, + messages: List[int], + destination: str, + source: str = 'INBOX', + account: Optional[AccountType] = None, + ): + """ + Copy a set of messages IDs from a folder to another. + + :param messages: List of message IDs. + :param source: Source folder. + :param destination: Destination folder. + :param account: Account name or index (default: default account). + """ + return self._get_in_plugin(account).copy_messages( + messages=messages, dest_folder=destination, source_folder=source + ) + + @action + def move_messages( + self, + messages: List[int], + destination: str, + source: str = 'INBOX', + account: Optional[AccountType] = None, + ): + """ + Move a set of messages IDs from a folder to another. + + :param messages: List of message IDs. + :param source: Source folder. + :param destination: Destination folder. + :param account: Account name or index (default: default account). + """ + return self._get_in_plugin(account).move_messages( + messages=messages, dest_folder=destination, source_folder=source + ) + + @action + def expunge_messages( + self, + messages: List[int], + folder: str = 'INBOX', + account: Optional[AccountType] = None, + ): + """ + When ``messages`` is not set, remove all the messages from ``folder`` + marked as ``Deleted``. + + :param folder: IMAP folder (default: ``INBOX``). + :param messages: List of message IDs to expunge (default: all those + marked as ``Deleted``). + :param account: Account name or index (default: default account). + """ + return self._get_in_plugin(account).expunge_messages( + folder=folder, messages=messages + ) + + @action + def send( + self, + to: Union[str, List[str]], + from_: Optional[str] = None, + cc: Optional[Union[str, List[str]]] = None, + bcc: Optional[Union[str, List[str]]] = None, + subject: str = '', + body: str = '', + body_type: str = 'plain', + attachments: Optional[List[str]] = None, + headers: Optional[Dict[str, str]] = None, + account: Optional[AccountType] = None, + **kwargs, + ): """ Send an email through the specified SMTP sender. :param to: Receiver(s), as comma-separated strings or list. - :param from_: Sender email address (``from`` is also supported outside of Python contexts). + :param from_: Sender email address (``from`` is also supported). :param cc: Carbon-copy addresses, as comma-separated strings or list :param bcc: Blind carbon-copy addresses, as comma-separated strings or list :param subject: Mail subject. @@ -227,15 +979,124 @@ class MailOutPlugin(MailPlugin): :param body_type: Mail body type, as a subtype of ``text/`` (e.g. ``html``) (default: ``plain``). :param attachments: List of attachment files to send. :param headers: Key-value map of headers to be added. - :param connect_args: Parameters for ``.connect()``, if you want to override the default server configuration. + :param account: Account name/index to be used (default: default account). """ - if not from_ and 'from' in connect_args: - from_ = connect_args.pop('from') + plugin = self._get_out_plugin(account) + headers = {k.lower(): v for k, v in (headers or {}).items()} + cc = ([cc] if isinstance(cc, str) else cc) or [] + bcc = ([bcc] if isinstance(bcc, str) else bcc) or [] + return plugin.send( + to=to, + from_=( + from_ + or kwargs.get('from') + or (headers or {}).get('from') + or plugin.account.display_name + or plugin.account.username + ), + cc=cc, + bcc=bcc, + subject=subject, + body=body, + body_type=body_type, + attachments=attachments, + headers=headers, + ) - msg = self.create_message(to=to, from_=from_, cc=cc, bcc=bcc, subject=subject, body=body, body_type=body_type, - attachments=attachments, headers=headers) + @staticmethod + def _process_account_changes( + account: str, cur_status: AccountsStatus, new_status: AccountsStatus + ) -> AccountFolderChanges: + folders = new_status.get(account) or {} + mail_flag_changed: Dict[str, Dict[MailFlagType, Dict[int, bool]]] = defaultdict( + lambda: defaultdict(lambda: defaultdict(bool)) + ) - return self.send_message(msg, **connect_args) + for folder, folder_status in folders.items(): + for flag, new_mail in folder_status.items(): + cur_mail = (cur_status.get(account) or {}).get(folder, {}).get(flag, {}) + cur_mail_keys = set(map(int, cur_mail.keys())) + new_mail_keys = set(map(int, new_mail.keys())) + mail_flag_added_keys = new_mail_keys - cur_mail_keys + mail_flag_removed_keys = cur_mail_keys - new_mail_keys + mail_flag_changed[folder][flag].update( + { + **{msg_id: True for msg_id in mail_flag_added_keys}, + **{msg_id: False for msg_id in mail_flag_removed_keys}, + } + ) + + return mail_flag_changed + + def _generate_account_events( + self, account: str, msgs: Dict[int, Mail], folder_changes: AccountFolderChanges + ): + for folder, changes in folder_changes.items(): + for flag, flag_changes in changes.items(): + for msg_id, flag_added in flag_changes.items(): + msg = msgs.get(msg_id) + if not msg: + continue + + if flag == MailFlagType.UNREAD: + evt_type = UnseenMailEvent if flag_added else SeenMailEvent + elif flag == MailFlagType.FLAGGED: + evt_type = ( + FlaggedMailEvent if flag_added else UnflaggedMailEvent + ) + else: + continue + + self._bus.post( + evt_type( + account=account, + folder=folder, + message=msg, + ) + ) + + def _generate_events(self, cur_status: AccountsStatus, new_status: AccountsStatus): + for account in new_status: + mail_flag_changed = self._process_account_changes( + account, cur_status, new_status + ) + + msg_ids = { + msg_id + for _, changes in mail_flag_changed.items() + for _, flag_changes in changes.items() + for msg_id, _ in flag_changes.items() + } + + if not msg_ids: + continue + + acc = self._get_account(account) + if not acc.incoming: + continue + + msgs = acc.incoming.get_messages(*msg_ids, with_body=False) + self._generate_account_events(account, msgs, mail_flag_changed) + + def _update_status(self, status: AccountsStatus): + self._status = status + with open(self._status_file, 'w') as f: + status.write(f) + + def loop(self): + cur_status = self._status.copy() + new_status = self._check_mailboxes() + self._generate_events(cur_status, new_status) + self._update_status(new_status) + + def main(self): + while not self.should_stop(): + try: + self.loop() + except Exception as e: + self.logger.exception(e) + finally: + self.wait_stop(self.poll_interval) # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/mail/_account.py b/platypush/plugins/mail/_account.py new file mode 100644 index 00000000..79d43e95 --- /dev/null +++ b/platypush/plugins/mail/_account.py @@ -0,0 +1,106 @@ +from dataclasses import dataclass +from typing import Any, Collection, Dict, Optional + +from ._model import AccountConfig +from ._plugin import BaseMailPlugin, MailInPlugin, MailOutPlugin + + +@dataclass +class Account: + """ + Models a mail account. + """ + + name: str + poll_interval: float + display_name: Optional[str] = None + incoming: Optional[MailInPlugin] = None + outgoing: Optional[MailOutPlugin] = None + monitor_folders: Optional[Collection[str]] = None + default: bool = False + last_check: Optional[float] = None + + @classmethod + def build( + cls, + name: str, + timeout: float, + poll_interval: float, + display_name: Optional[str] = None, + incoming: Optional[Dict[str, Any]] = None, + outgoing: Optional[Dict[str, Any]] = None, + monitor_folders: Optional[Collection[str]] = None, + username: Optional[str] = None, + password: Optional[str] = None, + password_cmd: Optional[str] = None, + keyfile: Optional[str] = None, + certfile: Optional[str] = None, + access_token: Optional[str] = None, + oauth_mechanism: Optional[str] = None, + oauth_vendor: Optional[str] = None, + default: bool = False, + last_check: Optional[float] = None, + ) -> 'Account': + account_args = { + 'username': username, + 'password': password, + 'password_cmd': password_cmd, + 'access_token': access_token, + 'oauth_mechanism': oauth_mechanism, + 'oauth_vendor': oauth_vendor, + 'display_name': display_name, + } + + in_plugin = None + if incoming: + server = incoming.pop('server', None) + assert server, 'No server provided for incoming mail for account "{name}"' + + keyfile = incoming.pop('keyfile', keyfile) + certfile = incoming.pop('certfile', certfile) + account = AccountConfig(**{**account_args, **incoming}) + in_plugin = BaseMailPlugin.build( + server=server, + account=account, + timeout=timeout, + keyfile=keyfile, + certfile=certfile, + ) + + assert isinstance( + in_plugin, MailInPlugin + ), 'Incoming mail plugin expected for account "{name}"' + + out_plugin = None + if outgoing: + server = outgoing.pop('server', None) + assert server, 'No server provided for outgoing mail for account "{name}"' + + keyfile = outgoing.pop('keyfile', keyfile) + certfile = outgoing.pop('certfile', certfile) + account = AccountConfig(**{**account_args, **outgoing}) + out_plugin = BaseMailPlugin.build( + server=server, + account=account, + timeout=timeout, + keyfile=keyfile, + certfile=certfile, + ) + + assert isinstance( + out_plugin, MailOutPlugin + ), 'Outgoing mail plugin expected for account "{name}"' + + return cls( + name=name, + display_name=display_name, + incoming=in_plugin, + outgoing=out_plugin, + monitor_folders=monitor_folders, + poll_interval=poll_interval, + default=default, + last_check=last_check, + ) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/mail/_model/__init__.py b/platypush/plugins/mail/_model/__init__.py new file mode 100644 index 00000000..ef84e079 --- /dev/null +++ b/platypush/plugins/mail/_model/__init__.py @@ -0,0 +1,14 @@ +from ._config import AccountConfig, ServerConfig +from ._mail import FolderStatus, Mail, MailFlagType, AccountsStatus +from ._transport import TransportEncryption + + +__all__ = [ + 'AccountConfig', + 'FolderStatus', + 'Mail', + 'MailFlagType', + 'AccountsStatus', + 'ServerConfig', + 'TransportEncryption', +] diff --git a/platypush/plugins/mail/_model/_config/__init__.py b/platypush/plugins/mail/_model/_config/__init__.py new file mode 100644 index 00000000..be22b32d --- /dev/null +++ b/platypush/plugins/mail/_model/_config/__init__.py @@ -0,0 +1,8 @@ +from ._account import AccountConfig +from ._server import ServerConfig + + +__all__ = [ + 'AccountConfig', + 'ServerConfig', +] diff --git a/platypush/plugins/mail/_model/_config/_account.py b/platypush/plugins/mail/_model/_config/_account.py new file mode 100644 index 00000000..c24ce3e1 --- /dev/null +++ b/platypush/plugins/mail/_model/_config/_account.py @@ -0,0 +1,43 @@ +import subprocess + +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class AccountConfig: + """ + Model for a mail account configuration. + """ + + username: str + password: Optional[str] = None + password_cmd: Optional[str] = None + access_token: Optional[str] = None + oauth_mechanism: Optional[str] = None + oauth_vendor: Optional[str] = None + display_name: Optional[str] = None + + def __post_init__(self): + """ + Ensure that at least one of password, password_cmd or access_token is provided. + """ + assert ( + self.password or self.password_cmd or self.access_token + ), 'No password, password_cmd or access_token provided' + + def get_password(self) -> str: + """ + Get the password either from a provided string or from a password command. + """ + if self.password_cmd: + with subprocess.Popen( + ['sh', '-c', self.password_cmd], stdout=subprocess.PIPE + ) as proc: + return proc.communicate()[0].decode() + + assert self.password, 'No password provided' + return self.password + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/mail/_model/_config/_server.py b/platypush/plugins/mail/_model/_config/_server.py new file mode 100644 index 00000000..56f48689 --- /dev/null +++ b/platypush/plugins/mail/_model/_config/_server.py @@ -0,0 +1,22 @@ +from dataclasses import dataclass +from typing import Optional + +from .._transport import TransportEncryption + + +@dataclass +class ServerConfig: + """ + Configuration for a mail server. + """ + + server: str + port: int + encryption: TransportEncryption + timeout: float + keyfile: Optional[str] = None + certfile: Optional[str] = None + domain: Optional[str] = None + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/mail/_model/_mail.py b/platypush/plugins/mail/_model/_mail.py new file mode 100644 index 00000000..edc1d544 --- /dev/null +++ b/platypush/plugins/mail/_model/_mail.py @@ -0,0 +1,254 @@ +import base64 +import email +import logging +import json +from email.message import Message + +from collections import defaultdict +from datetime import datetime +from enum import Enum +from typing import Any, Dict, IO, List, Optional, Union + +from platypush.message import JSONAble + +logger = logging.getLogger(__name__) + +_text_types = { + 'text/plain', + 'text/html', + 'text/rtf', + 'text/enriched', + 'text/markdown', + 'application/rtf', +} + + +class Mail(JSONAble): # pylint: disable=too-few-public-methods + """ + Model for a mail message. + """ + + def __init__( + self, + id: int, # pylint: disable=redefined-builtin + date: datetime, + size: int, + from_: Optional[Union[Dict[str, str], List[str]]] = None, + to: Optional[Union[Dict[str, str], List[str]]] = None, + cc: Optional[Union[Dict[str, str], List[str]]] = None, + bcc: Optional[Union[Dict[str, str], List[str]]] = None, + subject: str = '', + content: Optional[Any] = None, + **kwargs, + ): + self.id = id + self.date = date + self.size = size + self.from_ = from_ or kwargs.pop('from', None) + self.to = to + self.cc = cc or [] + self.bcc = bcc or [] + self.subject = subject + self._content = content + self.args = kwargs + + @staticmethod + def _parse_body(msg: Message) -> str: + body = '' + if msg.is_multipart(): + for part in msg.walk(): + if ( + part.get_content_type() in _text_types + and + # skip any text/plain (txt) attachments + 'attachment' not in part.get('Content-Disposition', '') + ): + body = bytes(part.get_payload(decode=True)).decode() + break + else: + body = bytes(msg.get_payload(decode=True)).decode() + + return body + + @staticmethod + def _parse_attachments(msg: Message) -> List[dict]: + attachments = [] + if msg.is_multipart(): + for part in msg.walk(): + if 'attachment' not in part.get('Content-Disposition', ''): + continue + + raw_payload = bytes(part.get_payload(decode=True)) + try: + # Try to decode it as a string + payload = raw_payload.decode() + except UnicodeDecodeError: + # Otherwise, return a JSON-encoded string + payload = base64.b64encode(raw_payload).decode() + + attachments.append( + { + 'filename': part.get_filename(), + 'headers': dict(part), + 'body': payload, + } + ) + + return attachments + + @classmethod + def _parse_payload(cls, payload: Union[bytes, bytearray]) -> dict: + msg = email.message_from_bytes(payload) + return { + 'headers': dict(msg), + 'body': cls._parse_body(msg), + 'attachments': cls._parse_attachments(msg), + } + + @property + def content(self): + if isinstance(self._content, (bytes, bytearray)): + try: + return self._parse_payload(self._content) + except Exception as e: + logger.warning( + 'Error while parsing payload for message ID %s: %s', self.id, e + ) + logger.exception(e) + + return self._content + + def to_json(self) -> dict: + return { + 'id': self.id, + 'date': self.date, + 'size': self.size, + 'from': self.from_, + 'to': self.to, + 'cc': self.cc, + 'bcc': self.bcc, + 'subject': self.subject, + 'content': self.content, + **self.args, + } + + +class MailFlagType(Enum): + """ + Types of supported mail flags. + """ + + FLAGGED = 'flagged' + UNREAD = 'unread' + + +FolderStatus = Dict[MailFlagType, Dict[int, Mail]] +FoldersStatus = Dict[str, FolderStatus] + + +class AccountsStatus: + """ + Models the status of all the monitored mailboxes. + """ + + def __init__(self): + self._dict: Dict[str, FoldersStatus] = defaultdict( + lambda: defaultdict(lambda: {evt: {} for evt in MailFlagType}) + ) + + class Serializer(json.JSONEncoder): + def default(self, o): + if isinstance(o, datetime): + return o.isoformat() + if isinstance(o, MailFlagType): + return o.value + return json.JSONEncoder.default(self, o) + + class Deserializer(json.JSONDecoder): + def __init__(self): + super().__init__(object_hook=self._hook) + + @staticmethod + def _hook(o): + if 'date' in o: + o['date'] = datetime.fromisoformat(o['date']) + if 'flag' in o: + o['flag'] = MailFlagType(o['flag']) + return o + + def __getitem__(self, item: str) -> FoldersStatus: + return self._dict[item] + + def __setitem__(self, key: str, value: FoldersStatus): + self._dict[key] = value + + def __delitem__(self, key: str) -> None: + del self._dict[key] + + def __iter__(self): + return iter(self._dict) + + def __len__(self) -> int: + return len(self._dict) + + def __contains__(self, item) -> bool: + return item in self._dict + + def __str__(self): + return json.dumps(self._dict, cls=self.Serializer) + + def __repr__(self): + return self.__str__() + + def items(self): + return self._dict.items() + + def copy(self) -> 'AccountsStatus': + obj = AccountsStatus() + obj._dict.update( + { + account: { + folder: {MailFlagType(evt): msgs for evt, msgs in statuses.items()} + for folder, statuses in folders.items() + } + for account, folders in self._dict.items() + } + ) + return obj + + def get(self, key: str, default=None) -> Optional[FoldersStatus]: + return self._dict.get(key, default) + + @classmethod + def read(cls, f: IO) -> 'AccountsStatus': + obj = cls() + obj._dict.update( + { + account: { + folder: {MailFlagType(evt): msgs for evt, msgs in statuses.items()} + for folder, statuses in folders.items() + } + for account, folders in json.load(f, cls=cls.Deserializer).items() + } + ) + return obj + + def write(self, f: IO): + f.write( + json.dumps( + { + account: { + folder: { + evt.value: {msg_id: {} for msg_id in msgs} + for evt, msgs in statuses.items() + } + for folder, statuses in folders.items() + } + for account, folders in self._dict.items() + }, + cls=self.Serializer, + ) + ) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/mail/_model/_transport.py b/platypush/plugins/mail/_model/_transport.py new file mode 100644 index 00000000..27d8b529 --- /dev/null +++ b/platypush/plugins/mail/_model/_transport.py @@ -0,0 +1,25 @@ +from enum import IntEnum + + +class TransportEncryption(IntEnum): + """ + Enum for mail transport encryption types. + """ + + NONE = 0 + STARTTLS = 1 + SSL = 2 + + @classmethod + def by_url_scheme(cls, scheme: str) -> 'TransportEncryption': + """ + Get the transport encryption type from the specified URL scheme. + """ + if scheme.endswith('+starttls'): + return cls.STARTTLS + if scheme.endswith('s'): + return cls.SSL + return cls.NONE + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/mail/_plugin/__init__.py b/platypush/plugins/mail/_plugin/__init__.py new file mode 100644 index 00000000..306b1845 --- /dev/null +++ b/platypush/plugins/mail/_plugin/__init__.py @@ -0,0 +1,12 @@ +from ._base import BaseMailPlugin +from ._in import MailInPlugin +from ._out import MailOutPlugin +from ._utils import mail_plugins + + +__all__ = [ + 'BaseMailPlugin', + 'MailInPlugin', + 'MailOutPlugin', + 'mail_plugins', +] diff --git a/platypush/plugins/mail/_plugin/_base.py b/platypush/plugins/mail/_plugin/_base.py new file mode 100644 index 00000000..ecbdb664 --- /dev/null +++ b/platypush/plugins/mail/_plugin/_base.py @@ -0,0 +1,130 @@ +import logging +import os +import re + +from abc import ABC, abstractmethod +from typing import Dict, Optional +from urllib.parse import urlparse + +from .._model import AccountConfig, ServerConfig, TransportEncryption + +email_match_re = re.compile(r'[^@]+@[^@]+\.[^@]+') + + +class BaseMailPlugin(ABC): # pylint: disable=too-few-public-methods + """ + Base class for mail plugins. + """ + + def __init__( + self, + server: str, + account: AccountConfig, + timeout: float, + keyfile: Optional[str] = None, + certfile: Optional[str] = None, + domain: Optional[str] = None, + **_, + ): + self.logger = logging.getLogger(self.__class__.__name__) + self.account = account + self.server = self._get_server_config( + server=server, + timeout=timeout, + keyfile=keyfile, + certfile=certfile, + domain=domain, + ) + + @staticmethod + def _get_path(path: str) -> str: + return os.path.abspath(os.path.expanduser(path)) + + @classmethod + def _get_server_config( + cls, + server: str, + timeout: float, + keyfile: Optional[str], + certfile: Optional[str], + domain: Optional[str], + ) -> ServerConfig: + url = urlparse(server) + assert url.hostname, f'No hostname specified: "{server}"' + + ssl = TransportEncryption.by_url_scheme(url.scheme) + port = url.port or cls.default_ports().get(ssl) + assert port, f'No port specified and no default available: "{server}"' + + if keyfile: + keyfile = cls._get_path(keyfile) + if certfile: + certfile = cls._get_path(certfile) + + return ServerConfig( + server=url.hostname, + port=port, + encryption=ssl, + timeout=timeout, + keyfile=keyfile, + certfile=certfile, + domain=domain, + ) + + @property + def from_string(self): + """ + :return: The from string for the account. + """ + return self.account.display_name or self.account.username + + @classmethod + @abstractmethod + def _matches_url_scheme(cls, scheme: str) -> bool: + raise NotImplementedError() + + @classmethod + def can_handle(cls, url: str) -> bool: + """ + Check whether the plugin can handle the specified URL. + """ + return cls._matches_url_scheme(urlparse(url).scheme) + + @classmethod + @abstractmethod + def default_ports(cls) -> Dict[TransportEncryption, int]: + """ + :return: A mapping of transport encryption to default port. + """ + raise NotImplementedError() + + @classmethod + def build( + cls, + server: str, + account: AccountConfig, + timeout: float, + keyfile: Optional[str] = None, + certfile: Optional[str] = None, + ) -> 'BaseMailPlugin': + from ._utils import mail_plugins + + url_parsed = urlparse(server) + assert url_parsed.hostname, f'No hostname specified: "{server}"' + + mail_cls = next( + (plugin for plugin in mail_plugins if plugin.can_handle(server)), + None, + ) + + assert mail_cls, f'No mail plugin found for URL: "{server}"' + return mail_cls( + server=server, + account=account, + timeout=timeout, + keyfile=keyfile, + certfile=certfile, + ) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/mail/_plugin/_in.py b/platypush/plugins/mail/_plugin/_in.py new file mode 100644 index 00000000..903703fa --- /dev/null +++ b/platypush/plugins/mail/_plugin/_in.py @@ -0,0 +1,125 @@ +from abc import ABC, abstractmethod +from typing import Dict, Iterable, List, Union + +from .._model import Mail +from ._base import BaseMailPlugin + + +class MailInPlugin(BaseMailPlugin, ABC): + """ + Base class for mail in plugins. + """ + + @abstractmethod + def get_folders(self, **_) -> list: + raise NotImplementedError() + + @abstractmethod + def get_sub_folders(self, **_) -> list: + raise NotImplementedError() + + @abstractmethod + def search( + self, criteria: Union[str, Iterable[str]], folder: str, **_ + ) -> List[Mail]: + raise NotImplementedError() + + @abstractmethod + def search_unseen_messages(self, folder: str) -> List[Mail]: + raise NotImplementedError() + + @abstractmethod + def search_flagged_messages(self, folder: str, **_) -> List[Mail]: + raise NotImplementedError() + + @abstractmethod + def search_starred_messages(self, folder: str, **_) -> List[Mail]: + raise NotImplementedError() + + @abstractmethod + def sort( + self, + folder: str, + sort_criteria: Union[str, Iterable[str]], + criteria: Union[str, Iterable[str]], + ) -> list: + raise NotImplementedError() + + @abstractmethod + def get_messages(self, *ids, with_body: bool = True, **_) -> Dict[int, Mail]: + raise NotImplementedError() + + def get_message( + self, id, with_body: bool = True, **_ # pylint: disable=redefined-builtin + ) -> Mail: + msgs = self.get_messages(id, with_body=with_body) + msg = msgs.get(id) + assert msg, f"Message {id} not found" + return msg + + @abstractmethod + def create_folder(self, folder: str, **_): + raise NotImplementedError() + + @abstractmethod + def rename_folder(self, old_name: str, new_name: str, **_): + raise NotImplementedError() + + @abstractmethod + def delete_folder(self, folder: str, **_): + raise NotImplementedError() + + @abstractmethod + def add_flags( + self, messages: list, flags: Union[str, Iterable[str]], folder: str, **_ + ): + raise NotImplementedError() + + @abstractmethod + def set_flags( + self, messages: list, flags: Union[str, Iterable[str]], folder: str, **_ + ): + raise NotImplementedError() + + @abstractmethod + def remove_flags( + self, messages: list, flags: Union[str, Iterable[str]], folder: str, **_ + ): + raise NotImplementedError() + + @abstractmethod + def delete_messages(self, messages: list, folder: str, **_): + raise NotImplementedError() + + @abstractmethod + def restore_messages(self, messages: list, folder: str, **_): + raise NotImplementedError() + + @abstractmethod + def copy_messages(self, messages: list, dest_folder: str, source_folder: str, **_): + raise NotImplementedError() + + @abstractmethod + def move_messages(self, messages: list, dest_folder: str, source_folder: str, **_): + raise NotImplementedError() + + @abstractmethod + def expunge_messages(self, folder: str, messages: list, **_): + raise NotImplementedError() + + @abstractmethod + def flag_messages(self, messages: list, folder: str, **_): + raise NotImplementedError() + + @abstractmethod + def unflag_messages(self, messages: List[int], folder: str = 'INBOX', **_): + raise NotImplementedError() + + def flag_message(self, message: int, folder: str, **_): + return self.flag_messages([message], folder=folder) + + def unflag_message(self, message: int, folder: str, **_): + return self.unflag_messages([message], folder=folder) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/mail/_plugin/_out.py b/platypush/plugins/mail/_plugin/_out.py new file mode 100644 index 00000000..8ad4c169 --- /dev/null +++ b/platypush/plugins/mail/_plugin/_out.py @@ -0,0 +1,138 @@ +import os + +from abc import ABC, abstractmethod +from datetime import datetime +from email.message import Message +from email.mime.application import MIMEApplication +from email.mime.audio import MIMEAudio +from email.mime.base import MIMEBase +from email.mime.image import MIMEImage +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from mimetypes import guess_type +from typing import Dict, Optional, Sequence, Union + +from dateutil import tz + +from .._utils import normalize_from_header +from ._base import BaseMailPlugin + + +class MailOutPlugin(BaseMailPlugin, ABC): + """ + Base class for mail out plugins. + """ + + @abstractmethod + def send_message(self, message: Message, **_): + raise NotImplementedError() + + @staticmethod + def _file_to_part(file: str) -> MIMEBase: + _type, _subtype, _type_class = 'application', 'octet-stream', MIMEApplication + mime_type, _sub_subtype = guess_type(file) + + if mime_type: + _type, _subtype = mime_type.split('/') + if _sub_subtype: + _subtype += ';' + _sub_subtype + + if _type == 'application': + _type_class = MIMEApplication + elif _type == 'audio': + _type_class = MIMEAudio + elif _type == 'image': + _type_class = MIMEImage + elif _type == 'text': + _type_class = MIMEText + + args = {} + if _type_class != MIMEText: + mode = 'rb' + args['Name'] = os.path.basename(file) + else: + mode = 'r' + + with open(file, mode) as f: + return _type_class(f.read(), _subtype, **args) + + @classmethod + def create_message( + cls, + to: Union[str, Sequence[str]], + from_: Optional[str] = None, + cc: Optional[Union[str, Sequence[str]]] = None, + bcc: Optional[Union[str, Sequence[str]]] = None, + subject: str = '', + body: str = '', + body_type: str = 'plain', + attachments: Optional[Sequence[str]] = None, + headers: Optional[Dict[str, str]] = None, + ) -> Message: + assert from_, 'from/from_ field not specified' + + content = MIMEText(body, body_type) + if attachments: + msg = MIMEMultipart() + msg.attach(content) + + for attachment in attachments: + attachment = os.path.abspath(os.path.expanduser(attachment)) + assert os.path.isfile(attachment), f'No such file: {attachment}' + part = cls._file_to_part(attachment) + part[ + 'Content-Disposition' + ] = f'attachment; filename="{os.path.basename(attachment)}"' + msg.attach(part) + else: + msg = content + + msg['From'] = from_ + msg['To'] = to if isinstance(to, str) else ', '.join(to) + msg['Cc'] = ', '.join(cc) if cc else '' + msg['Bcc'] = ', '.join(bcc) if bcc else '' + msg['Subject'] = subject + msg['Date'] = ( + datetime.now() + .replace(tzinfo=tz.tzlocal()) + .strftime('%a, %d %b %Y %H:%M:%S %z') + ) + + if headers: + for name, value in headers.items(): + msg.add_header(name, value) + + return msg + + def send( + self, + to: Union[str, Sequence[str]], + from_: Optional[str] = None, + cc: Optional[Union[str, Sequence[str]]] = None, + bcc: Optional[Union[str, Sequence[str]]] = None, + subject: str = '', + body: str = '', + body_type: str = 'plain', + attachments: Optional[Sequence[str]] = None, + headers: Optional[Dict[str, str]] = None, + **args, + ): + if not from_ and 'from' in args: + from_ = args.pop('from') + + msg = self.create_message( + to=to, + from_=normalize_from_header(from_, self.account, self.server), + cc=cc, + bcc=bcc, + subject=subject, + body=body, + body_type=body_type, + attachments=attachments, + headers=headers, + ) + + return self.send_message(msg, **args) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/mail/_plugin/_utils.py b/platypush/plugins/mail/_plugin/_utils.py new file mode 100644 index 00000000..4dc08c2f --- /dev/null +++ b/platypush/plugins/mail/_plugin/_utils.py @@ -0,0 +1,40 @@ +import importlib +import inspect +import os + +from threading import RLock +from typing import Set, Type + +import pkgutil + +from ._base import BaseMailPlugin + + +def _scan_mail_plugins() -> Set[Type[BaseMailPlugin]]: + from platypush.plugins import mail + + # Recursively scan for class inside the `mail` module that inherit from + # BaseMailPlugin + base_file = inspect.getfile(mail) + plugins = set() + + for _, name, _ in pkgutil.walk_packages( + [os.path.dirname(base_file)], prefix=f'{mail.__name__}.' + ): + module = importlib.import_module(name) + for _, cls in inspect.getmembers(module, inspect.isclass): + if not inspect.isabstract(cls) and issubclass(cls, BaseMailPlugin): + plugins.add(cls) + + return plugins + + +_mail_plugins_lock = RLock() +mail_plugins = set() + +with _mail_plugins_lock: + if not mail_plugins: + mail_plugins = _scan_mail_plugins() + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/mail/_utils.py b/platypush/plugins/mail/_utils.py new file mode 100644 index 00000000..3927bba2 --- /dev/null +++ b/platypush/plugins/mail/_utils.py @@ -0,0 +1,73 @@ +import re +from typing import Optional +from urllib.parse import urlparse + +from dns.exception import DNSException +from dns.resolver import resolve + +from ._account import AccountConfig +from ._model import ServerConfig + +_email_regex = re.compile(r'[^@]+@[^@]+\.[^@]+') +_from_header_regex = re.compile(r'^(?:"?([^"]+)"?\s+)?]+)>?$') +_ipv4_regex = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$') +_ipv6_regex = re.compile(r'^[0-9a-fA-F:]+$') + + +def infer_mail_domain(account: AccountConfig, server: ServerConfig) -> str: + """ + Infers the mail domain from the account and server configuration. + """ + + if server.domain: + return server.domain + + if account.username and _email_regex.match(account.username): + return account.username.split('@', 1)[1] + + if server.server: + if _ipv4_regex.match(server.server) or _ipv6_regex.match(server.server): + return server.server + + host = urlparse(server.server).hostname + assert host, f'Could not parse hostname from server URL: {server.server}' + host_tokens = host.split('.') + + while host_tokens: + try: + resolve(host, 'MX') + return host + except DNSException: + host_tokens.pop(0) + host = '.'.join(host_tokens) + + raise AssertionError(f'Could not resolve MX record for {host}') + + raise AssertionError('Could not infer mail domain from configuration.') + + +def infer_mail_address(account: AccountConfig, server: ServerConfig) -> str: + if account.username and _email_regex.match(account.username): + return account.username + + return f'{account.username}@{infer_mail_domain(account, server)}' + + +def normalize_from_header( + from_: Optional[str], account: AccountConfig, server: ServerConfig +) -> str: + """ + Normalizes the value of the "From" header. + """ + + if not from_: + from_ = account.display_name or account.username + + if _email_regex.match(from_): + return from_ + + m = _from_header_regex.match(from_) + if m and _email_regex.match(m.group(2)): + return f'{m.group(1)} <{m.group(2)}>' + + return f'{from_} <{infer_mail_address(account, server)}>' diff --git a/platypush/plugins/mail/imap/__init__.py b/platypush/plugins/mail/imap/__init__.py index 2a57daa4..9b8da668 100644 --- a/platypush/plugins/mail/imap/__init__.py +++ b/platypush/plugins/mail/imap/__init__.py @@ -1,11 +1,13 @@ -import email -from typing import Optional, List, Dict, Union, Any, Tuple +import ssl + +from contextlib import contextmanager +from typing import Generator, Iterable, Optional, List, Dict, Union, Any, Tuple from imapclient import IMAPClient from imapclient.response_types import Address -from platypush.plugins import action -from platypush.plugins.mail import MailInPlugin, ServerInfo, Mail +from .._model import Mail, TransportEncryption +from .._plugin import MailInPlugin class MailImapPlugin(MailInPlugin): @@ -13,114 +15,47 @@ class MailImapPlugin(MailInPlugin): Plugin to interact with a mail server over IMAP. """ - _default_port = 143 - _default_ssl_port = 993 + @classmethod + def _matches_url_scheme(cls, scheme: str) -> bool: + return scheme in ('imap', 'imaps') - def __init__( - self, - server: str, - port: Optional[int] = None, - username: Optional[str] = None, - password: Optional[str] = None, - password_cmd: Optional[str] = None, - access_token: Optional[str] = None, - oauth_mechanism: Optional[str] = 'XOAUTH2', - oauth_vendor: Optional[str] = None, - ssl: bool = False, - keyfile: Optional[str] = None, - certfile: Optional[str] = None, - timeout: Optional[int] = 60, - **kwargs - ): - """ - :param server: Server name/address. - :param port: Port (default: 143 for plain, 993 for SSL). - :param username: IMAP username. - :param password: IMAP password. - :param password_cmd: If you don't want to input your password in the configuration, run this command to fetch - or decrypt the password. - :param access_token: OAuth2 access token if the server supports OAuth authentication. - :param oauth_mechanism: OAuth2 mechanism (default: ``XOAUTH2``). - :param oauth_vendor: OAuth2 vendor (default: None). - :param ssl: Use SSL (default: False). - :param keyfile: Private key file for SSL connection if client authentication is required. - :param certfile: SSL certificate file or chain. - :param timeout: Server connect/read timeout in seconds (default: 60). - """ - super().__init__(**kwargs) - self.server_info = self._get_server_info( - server=server, - port=port, - username=username, - password=password, - password_cmd=password_cmd, - ssl=ssl, - keyfile=keyfile, - certfile=certfile, - access_token=access_token, - oauth_mechanism=oauth_mechanism, - oauth_vendor=oauth_vendor, - timeout=timeout, - ) + @classmethod + def default_ports(cls) -> Dict[TransportEncryption, int]: + return { + TransportEncryption.NONE: 143, + TransportEncryption.SSL: 993, + } - def _get_server_info( - self, - server: Optional[str] = None, - port: Optional[int] = None, - username: Optional[str] = None, - password: Optional[str] = None, - password_cmd: Optional[str] = None, - access_token: Optional[str] = None, - oauth_mechanism: Optional[str] = None, - oauth_vendor: Optional[str] = None, - ssl: Optional[bool] = None, - keyfile: Optional[str] = None, - certfile: Optional[str] = None, - timeout: Optional[int] = None, - **kwargs - ) -> ServerInfo: - return super()._get_server_info( - server=server, - port=port, - username=username, - password=password, - password_cmd=password_cmd, - ssl=ssl, - keyfile=keyfile, - certfile=certfile, - default_port=self._default_port, - default_ssl_port=self._default_ssl_port, - access_token=access_token, - oauth_mechanism=oauth_mechanism, - oauth_vendor=oauth_vendor, - timeout=timeout, - ) - - def connect(self, **connect_args) -> IMAPClient: - info = self._get_server_info(**connect_args) - self.logger.info('Connecting to {}'.format(info.server)) + @contextmanager + def connect(self) -> Generator[IMAPClient, None, None]: + has_ssl = self.server.encryption == TransportEncryption.SSL context = None - - if info.ssl: - import ssl - + if has_ssl and self.server.certfile: context = ssl.create_default_context() - context.load_cert_chain(certfile=info.certfile, keyfile=info.keyfile) - - client = IMAPClient( - host=info.server, port=info.port, ssl=info.ssl, ssl_context=context - ) - if info.password: - client.login(info.username, info.password) - elif info.access_token: - client.oauth2_login( - info.username, - access_token=info.access_token, - mech=info.oauth_mechanism, - vendor=info.oauth_vendor, + context.load_cert_chain( + certfile=self.server.certfile, keyfile=self.server.keyfile ) - return client + client = IMAPClient( + host=self.server.server, + port=self.server.port, + ssl=has_ssl, + ssl_context=context, + ) + + if self.account.access_token: + client.oauth2_login( + self.account.username, + access_token=self.account.access_token, + mech=self.account.oauth_mechanism or 'XOAUTH2', + vendor=self.account.oauth_vendor, + ) + else: + pwd = self.account.get_password() + client.login(self.account.username, pwd) + + yield client + client.logout() @staticmethod def _get_folders(data: List[tuple]) -> List[Dict[str, str]]: @@ -137,90 +72,12 @@ class MailImapPlugin(MailInPlugin): return folders - @action - def get_folders( - self, folder: str = '', pattern: str = '*', **connect_args - ) -> List[Dict[str, str]]: - """ - Get the list of all the folders hosted on the server or those matching a pattern. - - :param folder: Base folder (default: root). - :param pattern: Pattern to search (default: None). - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - :return: Example: - - .. code-block:: json - - [ - { - "name": "INBOX", - "flags": "\\Noinferiors", - "delimiter": "/" - }, - { - "name": "Archive", - "flags": "\\Noinferiors", - "delimiter": "/" - }, - { - "name": "Spam", - "flags": "\\Noinferiors", - "delimiter": "/" - } - ] - - """ - with self.connect(**connect_args) as client: - data = client.list_folders(directory=folder, pattern=pattern) - - return self._get_folders(data) - - @action - def get_sub_folders( - self, folder: str = '', pattern: str = '*', **connect_args - ) -> List[Dict[str, str]]: - """ - Get the list of all the sub-folders hosted on the server or those matching a pattern. - - :param folder: Base folder (default: root). - :param pattern: Pattern to search (default: None). - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - :return: Example: - - .. code-block:: json - - [ - { - "name": "INBOX", - "flags": "\\Noinferiors", - "delimiter": "/" - }, - { - "name": "Archive", - "flags": "\\Noinferiors", - "delimiter": "/" - }, - { - "name": "Spam", - "flags": "\\Noinferiors", - "delimiter": "/" - } - ] - - """ - with self.connect(**connect_args) as client: - data = client.list_sub_folders(directory=folder, pattern=pattern) - - return self._get_folders(data) - @staticmethod - def _parse_address(imap_addr: Address) -> Dict[str, str]: + def _parse_address(imap_addr: Address) -> Dict[str, Optional[str]]: return { 'name': imap_addr.name.decode() if imap_addr.name else None, 'route': imap_addr.route.decode() if imap_addr.route else None, - 'email': '{name}@{host}'.format( - name=imap_addr.mailbox.decode(), host=imap_addr.host.decode() - ), + 'email': imap_addr.mailbox.decode() + '@' + imap_addr.host.decode(), } @classmethod @@ -264,456 +121,194 @@ class MailImapPlugin(MailInPlugin): message['sender'] = cls._parse_addresses(envelope.sender) message['subject'] = envelope.subject.decode() if envelope.subject else None message['to'] = cls._parse_addresses(envelope.to) + if b'BODY[]' in imap_msg: + message['content'] = imap_msg[b'BODY[]'] return Mail(**message) - @action + @staticmethod + def _convert_flags(flags: Union[str, Iterable[str]]) -> List[bytes]: + if isinstance(flags, str): + flags = [flag.strip() for flag in flags.split(',')] + return [('\\' + flag).encode() for flag in flags] + + def get_folders( + self, folder: str = '', pattern: str = '*', **_ + ) -> List[Dict[str, str]]: + with self.connect() as client: + data = client.list_folders(directory=folder, pattern=pattern) + + return self._get_folders(data) + + def get_sub_folders( + self, folder: str = '', pattern: str = '*', **_ + ) -> List[Dict[str, str]]: + with self.connect() as client: + data = client.list_sub_folders(directory=folder, pattern=pattern) + + return self._get_folders(data) + def search( self, - criteria: Union[str, List[str]] = 'ALL', + criteria: Union[str, Iterable[str]] = 'ALL', folder: str = 'INBOX', - attributes: Optional[List[str]] = None, - **connect_args + attributes: Optional[Iterable[str]] = None, + **_, ) -> List[Mail]: - """ - Search for messages on the server that fit the specified criteria. - - :param criteria: It should be a sequence of one or more criteria items. Each criteria item may be either unicode - or bytes (default: ``ALL``). Example values:: - - ['UNSEEN'] - ['SMALLER', 500] - ['NOT', 'DELETED'] - ['TEXT', 'foo bar', 'FLAGGED', 'SUBJECT', 'baz'] - ['SINCE', '2020-03-14T12:13:45+00:00'] - - It is also possible (but not recommended) to pass the combined criteria as a single string. In this case - IMAPClient won't perform quoting, allowing lower-level specification of criteria. Examples of this style:: - - 'UNSEEN' - 'SMALLER 500' - 'NOT DELETED' - 'TEXT "foo bar" FLAGGED SUBJECT "baz"' - 'SINCE 03-Apr-2005' - - To support complex search expressions, criteria lists can be nested. The following will match messages that - are both not flagged and do not have "foo" in the subject:: - - ['NOT', ['SUBJECT', 'foo', 'FLAGGED']] - - :param folder: Folder to search (default: ``INBOX``). - :param attributes: Attributes that should be retrieved, according to - `RFC 3501 `_ - (default: ``ALL`` = ``[FLAGS INTERNALDATE RFC822.SIZE ENVELOPE]``). - Note that ``BODY`` will be ignored if specified here for performance reasons - use :meth:`.get_message` if - you want to get the full content of a message known its ID from :meth:`.search`. - - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - :return: List of messages matching the criteria. Example: - - .. code-block:: json - - [ - { - "id": 702, - "seq": 671, - "flags": [ - "nonjunk" - ], - "internal_date": "2020-08-30T00:31:52+00:00", - "size": 2908738, - "bcc": {}, - "cc": {}, - "date": "2020-08-30T00:31:52+00:00", - "from": { - "test123@gmail.com": { - "name": "A test", - "route": null, - "email": "test123@gmail.com" - } - }, - "message_id": "", - "in_reply_to": "", - "reply_to": {}, - "sender": { - "test123@gmail.com": { - "name": "A test", - "route": null, - "email": "test123@gmail.com" - } - }, - "subject": "Test email", - "to": { - "me@gmail.com": { - "name": null, - "route": null, - "email": "me@gmail.com" - } - } - } - ] - - """ if not attributes: - attributes = ['ALL'] + attributes = ['FLAGS', 'INTERNALDATE', 'RFC822.SIZE', 'ENVELOPE'] else: attributes = [attr.upper() for attr in attributes] data = {} - with self.connect(**connect_args) as client: + with self.connect() as client: client.select_folder(folder, readonly=True) - ids = client.search(criteria) - if len(ids): + ids = client.search(criteria) # type: ignore + if ids: data = client.fetch(list(ids), attributes) return [ self._parse_message(msg_id, data[msg_id]) for msg_id in sorted(data.keys()) ] - @action - def search_unseen_messages( - self, folder: str = 'INBOX', **connect_args - ) -> List[Mail]: - """ - Shortcut for :meth:`.search` that returns only the unread messages. - """ - return self.search( - criteria='UNSEEN', directory=folder, attributes=['ALL'], **connect_args - ) + def search_unseen_messages(self, folder: str = 'INBOX') -> List[Mail]: + return self.search(criteria='UNSEEN', directory=folder) - @action - def search_flagged_messages( - self, folder: str = 'INBOX', **connect_args - ) -> List[Mail]: - """ - Shortcut for :meth:`.search` that returns only the flagged/starred messages. - """ - return self.search( - criteria='Flagged', directory=folder, attributes=['ALL'], **connect_args - ) + def search_flagged_messages(self, folder: str = 'INBOX', **_) -> List[Mail]: + return self.search(criteria='Flagged', directory=folder) - @action - def search_starred_messages( - self, folder: str = 'INBOX', **connect_args - ) -> List[Mail]: - """ - Shortcut for :meth:`.search` that returns only the flagged/starred messages. - """ - return self.search_flagged_messages(folder, **connect_args) + def search_starred_messages(self, folder: str = 'INBOX', **_) -> List[Mail]: + return self.search_flagged_messages(folder) - @action def sort( self, folder: str = 'INBOX', - sort_criteria: Union[str, List[str]] = 'ARRIVAL', - criteria: Union[str, List[str]] = 'ALL', - **connect_args + sort_criteria: Union[str, Iterable[str]] = 'ARRIVAL', + criteria: Union[str, Iterable[str]] = 'ALL', ) -> List[int]: - """ - Return a list of message ids from the currently selected folder, sorted by ``sort_criteria`` and optionally - filtered by ``criteria``. Note that SORT is an extension to the IMAP4 standard so it may not be supported by - all IMAP servers. - - :param folder: Folder to be searched (default: ``INBOX``). - :param sort_criteria: It may be a sequence of strings or a single string. IMAPClient will take care any required - conversions. Valid *sort_criteria* values:: - - .. code-block:: python - - ['ARRIVAL'] - ['SUBJECT', 'ARRIVAL'] - 'ARRIVAL' - 'REVERSE SIZE' - - :param criteria: Optional filter for the messages, as specified in :meth:`.search`. - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - :return: A list of message IDs that fit the criteria. - """ - with self.connect(**connect_args) as client: + with self.connect() as client: client.select_folder(folder, readonly=True) - msg_ids = client.sort(sort_criteria=sort_criteria, criteria=criteria) + msg_ids = client.sort(sort_criteria=sort_criteria, criteria=criteria) # type: ignore return msg_ids - @action - def get_message(self, message: int, folder: str = 'INBOX', **connect_args) -> Mail: - """ - Get the full content of a message given the ID returned by :meth:`.search`. - - :param message: Message ID. - :param folder: Folder name (default: ``INBOX``). - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - :return: A message in the same format as :meth:`.search`, with an added ``payload`` attribute containing the - body/payload. - """ - with self.connect(**connect_args) as client: + def get_messages( + self, + *ids: int, + folder: str = 'INBOX', + with_body: bool = True, + **_, + ) -> Dict[int, Mail]: + ret = {} + with self.connect() as client: client.select_folder(folder, readonly=True) - data = client.fetch(message, ['ALL', 'RFC822']) - assert message in data, 'No such message ID: {}'.format(message) + attrs = ['FLAGS', 'RFC822.SIZE', 'INTERNALDATE', 'ENVELOPE'] + if with_body: + attrs.append('BODY[]') - data = data[message] - ret = self._parse_message(message, data) - msg = email.message_from_bytes(data[b'RFC822']) - ret.payload = msg.get_payload() + data = client.fetch(ids, attrs) + for id in ids: # pylint: disable=redefined-builtin + msg = data.get(id) + if not msg: + continue + + ret[id] = self._parse_message(id, msg) return ret - @action - def create_folder(self, folder: str, **connect_args): - """ - Create a folder on the server. - - :param folder: Folder name. - :param connect_args: - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - """ - with self.connect(**connect_args) as client: + def create_folder(self, folder: str, **_): + with self.connect() as client: client.create_folder(folder) - @action - def rename_folder(self, old_name: str, new_name: str, **connect_args): - """ - Rename a folder on the server. - - :param old_name: Previous name - :param new_name: New name - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - """ - with self.connect(**connect_args) as client: + def rename_folder(self, old_name: str, new_name: str, **_): + with self.connect() as client: client.rename_folder(old_name, new_name) - @action - def delete_folder(self, folder: str, **connect_args): - """ - Delete a folder from the server. - - :param folder: Folder name. - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - """ - with self.connect(**connect_args) as client: + def delete_folder(self, folder: str, **_): + with self.connect() as client: client.delete_folder(folder) - @staticmethod - def _convert_flags(flags: Union[str, List[str]]) -> List[bytes]: - if isinstance(flags, str): - flags = [flag.strip() for flag in flags.split(',')] - return [('\\' + flag).encode() for flag in flags] - - @action def add_flags( self, messages: List[int], - flags: Union[str, List[str]], + flags: Union[str, Iterable[str]], folder: str = 'INBOX', - **connect_args + **_, ): - """ - Add a set of flags to the specified set of message IDs. - - :param messages: List of message IDs. - :param flags: List of flags to be added. Examples: - - .. code-block:: python - - ['Flagged'] - ['Seen', 'Deleted'] - ['Junk'] - - :param folder: IMAP folder (default: ``INBOX``). - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - """ - with self.connect(**connect_args) as client: + with self.connect() as client: client.select_folder(folder) client.add_flags(messages, self._convert_flags(flags)) - @action def set_flags( self, messages: List[int], - flags: Union[str, List[str]], + flags: Union[str, Iterable[str]], folder: str = 'INBOX', - **connect_args + **_, ): - """ - Set a set of flags to the specified set of message IDs. - - :param messages: List of message IDs. - :param flags: List of flags to be added. Examples: - - .. code-block:: python - - ['Flagged'] - ['Seen', 'Deleted'] - ['Junk'] - - :param folder: IMAP folder (default: ``INBOX``). - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - """ - with self.connect(**connect_args) as client: + with self.connect() as client: client.select_folder(folder) client.set_flags(messages, self._convert_flags(flags)) - @action def remove_flags( self, messages: List[int], - flags: Union[str, List[str]], + flags: Union[str, Iterable[str]], folder: str = 'INBOX', - **connect_args + **_, ): - """ - Remove a set of flags to the specified set of message IDs. - - :param messages: List of message IDs. - :param flags: List of flags to be added. Examples: - - .. code-block:: python - - ['Flagged'] - ['Seen', 'Deleted'] - ['Junk'] - - :param folder: IMAP folder (default: ``INBOX``). - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - """ - with self.connect(**connect_args) as client: + with self.connect() as client: client.select_folder(folder) client.remove_flags(messages, self._convert_flags(flags)) - @action - def flag_messages(self, messages: List[int], folder: str = 'INBOX', **connect_args): - """ - Add a flag/star to the specified set of message IDs. + def flag_messages(self, messages: List[int], folder: str = 'INBOX', **_): + return self.add_flags(messages, ['Flagged'], folder=folder) - :param messages: List of message IDs. - :param folder: IMAP folder (default: ``INBOX``). - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - """ - return self.add_flags(messages, ['Flagged'], folder=folder, **connect_args) + def unflag_messages(self, messages: List[int], folder: str = 'INBOX', **_): + return self.remove_flags(messages, ['Flagged'], folder=folder) - @action - def unflag_messages( - self, messages: List[int], folder: str = 'INBOX', **connect_args - ): - """ - Remove a flag/star from the specified set of message IDs. - - :param messages: List of message IDs. - :param folder: IMAP folder (default: ``INBOX``). - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - """ - return self.remove_flags(messages, ['Flagged'], folder=folder, **connect_args) - - @action - def flag_message(self, message: int, folder: str = 'INBOX', **connect_args): - """ - Add a flag/star to the specified set of message ID. - - :param message: Message ID. - :param folder: IMAP folder (default: ``INBOX``). - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - """ - return self.flag_messages([message], folder=folder, **connect_args) - - @action - def unflag_message(self, message: int, folder: str = 'INBOX', **connect_args): - """ - Remove a flag/star from the specified set of message ID. - - :param message: Message ID. - :param folder: IMAP folder (default: ``INBOX``). - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - """ - return self.unflag_messages([message], folder=folder, **connect_args) - - @action def delete_messages( self, messages: List[int], folder: str = 'INBOX', expunge: bool = True, - **connect_args + **_, ): - """ - Set a specified set of message IDs as deleted. - - :param messages: List of message IDs. - :param folder: IMAP folder (default: ``INBOX``). - :param expunge: If set then the messages will also be expunged from the folder, otherwise they will only be - marked as deleted (default: ``True``). - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - """ - self.add_flags(messages, ['Deleted'], folder=folder, **connect_args) + self.add_flags(messages, ['Deleted'], folder=folder) if expunge: - self.expunge_messages(folder=folder, messages=messages, **connect_args) + self.expunge_messages(folder=folder, messages=messages) - @action - def undelete_messages( - self, messages: List[int], folder: str = 'INBOX', **connect_args - ): - """ - Remove the ``Deleted`` flag from the specified set of message IDs. + def restore_messages(self, messages: List[int], folder: str = 'INBOX', **_): + return self.remove_flags(messages, ['Deleted'], folder=folder) - :param messages: List of message IDs. - :param folder: IMAP folder (default: ``INBOX``). - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - """ - return self.remove_flags(messages, ['Deleted'], folder=folder, **connect_args) - - @action def copy_messages( self, messages: List[int], dest_folder: str, source_folder: str = 'INBOX', - **connect_args + **_, ): - """ - Copy a set of messages IDs from a folder to another. - - :param messages: List of message IDs. - :param source_folder: Source folder. - :param dest_folder: Destination folder. - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - """ - with self.connect(**connect_args) as client: + with self.connect() as client: client.select_folder(source_folder) client.copy(messages, dest_folder) - @action def move_messages( self, messages: List[int], dest_folder: str, source_folder: str = 'INBOX', - **connect_args + **_, ): - """ - Move a set of messages IDs from a folder to another. - - :param messages: List of message IDs. - :param source_folder: Source folder. - :param dest_folder: Destination folder. - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - """ - with self.connect(**connect_args) as client: + with self.connect() as client: client.select_folder(source_folder) client.move(messages, dest_folder) - @action def expunge_messages( self, folder: str = 'INBOX', messages: Optional[List[int]] = None, - **connect_args + **_, ): - """ - When ``messages`` is not set, remove all the messages from ``folder`` marked as ``Deleted``. - - :param folder: IMAP folder (default: ``INBOX``). - :param messages: List of message IDs to expunge (default: all those marked as ``Deleted``). - :param connect_args: Arguments to pass to :meth:`._get_server_info` for server configuration override. - """ - with self.connect(**connect_args) as client: + with self.connect() as client: client.select_folder(folder) client.expunge(messages) diff --git a/platypush/plugins/mail/imap/manifest.yaml b/platypush/plugins/mail/imap/manifest.yaml deleted file mode 100644 index 87adccd6..00000000 --- a/platypush/plugins/mail/imap/manifest.yaml +++ /dev/null @@ -1,7 +0,0 @@ -manifest: - events: {} - install: - pip: - - imapclient - package: platypush.plugins.mail.imap - type: plugin diff --git a/platypush/plugins/mail/manifest.yaml b/platypush/plugins/mail/manifest.yaml new file mode 100644 index 00000000..3dcb5210 --- /dev/null +++ b/platypush/plugins/mail/manifest.yaml @@ -0,0 +1,20 @@ +manifest: + events: + - platypush.message.event.mail.FlaggedMailEvent + - platypush.message.event.mail.SeenMailEvent + - platypush.message.event.mail.UnflaggedMailEvent + - platypush.message.event.mail.UnseenMailEvent + install: + apk: + - py3-dnspython + apt: + - python3-dnspython + dnf: + - python-dnspython + pacman: + - python-dnspython + pip: + - dnspython + - imapclient + package: platypush.plugins.mail + type: plugin diff --git a/platypush/plugins/mail/smtp/__init__.py b/platypush/plugins/mail/smtp/__init__.py index e0b14029..ecfcdb75 100644 --- a/platypush/plugins/mail/smtp/__init__.py +++ b/platypush/plugins/mail/smtp/__init__.py @@ -1,9 +1,11 @@ +from contextlib import contextmanager from email.message import Message -from typing import Optional, List +from typing import Dict, Generator from smtplib import SMTP, SMTP_SSL -from platypush.plugins.mail import MailOutPlugin, ServerInfo +from .._model import TransportEncryption +from .._plugin import MailOutPlugin class MailSmtpPlugin(MailOutPlugin): @@ -11,80 +13,63 @@ class MailSmtpPlugin(MailOutPlugin): Plugin to interact with a mail server over SMTP. """ - _default_port = 25 - _default_ssl_port = 465 + @classmethod + def _matches_url_scheme(cls, scheme: str) -> bool: + return scheme in ('smtp', 'smtps') - def __init__(self, server: Optional[str] = None, port: Optional[int] = None, local_hostname: Optional[str] = None, - source_address: Optional[List[str]] = None, username: Optional[str] = None, - password: Optional[str] = None, password_cmd: Optional[str] = None, access_token: Optional[str] = None, - oauth_mechanism: Optional[str] = 'XOAUTH2', oauth_vendor: Optional[str] = None, ssl: bool = False, - keyfile: Optional[str] = None, certfile: Optional[str] = None, timeout: Optional[int] = 60, **kwargs): - """ - :param server: Server name/address. - :param port: Port (default: 25 for plain, 465 for SSL). - :param local_hostname: If specified, local_hostname is used as the FQDN of the local host in the HELO/EHLO - command. Otherwise, the local hostname is found using socket.getfqdn(). - :param source_address: The optional source_address parameter allows binding to some specific source address in - a machine with multiple network interfaces, and/or to some specific source TCP port. It takes a 2-tuple - (host, port), for the socket to bind to as its source address before connecting. If omitted (or if host or - port are '' and/or 0 respectively) the OS default behavior will be used. - :param username: SMTP username. - :param password: SMTP password. - :param password_cmd: If you don't want to input your password in the configuration, run this command to fetch - or decrypt the password. - :param access_token: OAuth2 access token if the server supports OAuth authentication. - :param oauth_mechanism: OAuth2 mechanism (default: ``XOAUTH2``). - :param oauth_vendor: OAuth2 vendor (default: None). - :param ssl: Use SSL (default: False). - :param keyfile: Private key file for SSL connection if client authentication is required. - :param certfile: SSL certificate file or chain. - :param timeout: Server connect/read timeout in seconds (default: 60). - """ - super().__init__(**kwargs) - self.local_hostname = local_hostname - self.source_address = source_address - self.server_info = self._get_server_info(server=server, port=port, username=username, password=password, - password_cmd=password_cmd, ssl=ssl, keyfile=keyfile, certfile=certfile, - access_token=access_token, oauth_mechanism=oauth_mechanism, - oauth_vendor=oauth_vendor, timeout=timeout) - - def _get_server_info(self, server: Optional[str] = None, port: Optional[int] = None, username: Optional[str] = None, - password: Optional[str] = None, password_cmd: Optional[str] = None, - ssl: Optional[bool] = None, keyfile: Optional[str] = None, certfile: Optional[str] = None, - timeout: Optional[int] = None, **kwargs) -> ServerInfo: - return super()._get_server_info(server=server, port=port, username=username, password=password, - password_cmd=password_cmd, ssl=ssl, keyfile=keyfile, certfile=certfile, - default_port=self._default_port, default_ssl_port=self._default_ssl_port, - timeout=timeout) - - def connect(self, **connect_args) -> SMTP: - info = self._get_server_info(**connect_args) - self.logger.info('Connecting to {}'.format(info.server)) - smtp_args = { - 'host': info.server, - 'port': info.port, - 'local_hostname': self.local_hostname, - 'source_address': self.source_address, + @classmethod + def default_ports(cls) -> Dict[TransportEncryption, int]: + return { + TransportEncryption.NONE: 25, + TransportEncryption.SSL: 465, + TransportEncryption.STARTTLS: 587, } - if info.ssl: + @contextmanager + def connect(self) -> Generator[SMTP, None, None]: + smtp_args = { + 'host': self.server.server, + 'port': self.server.port, + } + + if self.server.encryption == TransportEncryption.SSL: client_type = SMTP_SSL - smtp_args.update(certfile=info.certfile, keyfile=info.keyfile) + if self.server.certfile: + smtp_args.update(certfile=self.server.certfile) + if self.server.keyfile: + smtp_args.update(keyfile=self.server.keyfile) else: client_type = SMTP client = client_type(**smtp_args) - if info.password: - client.login(info.username, info.password) - return client + if self.server.encryption == TransportEncryption.STARTTLS: + client.ehlo() + client.starttls() + else: + client.ehlo_or_helo_if_needed() - def send_message(self, message: Message, **connect_args): - with self.connect(**connect_args) as client: - errors = client.sendmail(message['From'], message['To'], message.as_string()) + pwd = None + try: + pwd = self.account.get_password() + except AssertionError: + pass - if errors: - return None, ['{}: {}'.format(code, err) for code, err in errors.items()] + if pwd: + client.login(self.account.username, pwd) + + yield client + client.quit() + + def send_message(self, message: Message, **_): + with self.connect() as client: + errors = client.sendmail( + message['From'], message['To'], message.as_string() + ) + + assert not errors, 'Failed to send message: ' + str( + [f'{code}: {err}' for code, err in errors.items()] + ) # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/mail/smtp/manifest.yaml b/platypush/plugins/mail/smtp/manifest.yaml deleted file mode 100644 index baf0400b..00000000 --- a/platypush/plugins/mail/smtp/manifest.yaml +++ /dev/null @@ -1,6 +0,0 @@ -manifest: - events: {} - install: - pip: [] - package: platypush.plugins.mail.smtp - type: plugin diff --git a/platypush/plugins/mailgun/__init__.py b/platypush/plugins/mailgun/__init__.py index 97d9f9e9..e2a78e37 100644 --- a/platypush/plugins/mailgun/__init__.py +++ b/platypush/plugins/mailgun/__init__.py @@ -1,40 +1,55 @@ -import os from typing import Optional, Union, Sequence import requests from requests.auth import HTTPBasicAuth -from platypush.plugins import action -from platypush.plugins.mail import MailOutPlugin +from platypush.plugins import Plugin, action -class MailgunPlugin(MailOutPlugin): +class MailgunPlugin(Plugin): """ Mailgun integration. """ - def __init__(self, api_key: str, api_base_url: str = 'https://api.mailgun.net/v3', **kwargs): + + def __init__( + self, + api_key: str, + api_base_url: str = 'https://api.mailgun.net/v3', + domain: Optional[str] = None, + timeout: float = 20.0, + **kwargs, + ): """ :param api_key: Mailgun API secret key. :param api_base_url: Use ``https://api.eu.mailgun.net/v3`` if you are using an EU account. + :param domain: Default registered domain that should be used for sending + emails if not specified in the :meth:`.send` action. + :param timeout: Default timeout for the requests (default: 20 seconds). """ super().__init__(**kwargs) self._api_key = api_key self._api_base_url = api_base_url - - def send_message(self, *_, **__): - pass + self._domain = domain + self._timeout = timeout @action def send( - self, domain: str, to: Union[str, Sequence[str]], from_: Optional[str] = None, - cc: Optional[Union[str, Sequence[str]]] = None, bcc: Optional[Union[str, Sequence[str]]] = None, - subject: str = '', body: str = '', body_type: str = 'plain', attachments: Optional[Sequence[str]] = None, - **kwargs - ): + self, + to: Union[str, Sequence[str]], + from_: Optional[str] = None, + cc: Optional[Union[str, Sequence[str]]] = None, + bcc: Optional[Union[str, Sequence[str]]] = None, + subject: str = '', + body: str = '', + body_type: str = 'plain', + domain: Optional[str] = None, + **kwargs, + ): """ Send an email through Mailgun. - :param domain: From which registered domain the email(s) should be sent. + :param domain: From which registered domain the email(s) should be sent + (default: the domain specified in the plugin configuration). :param to: Receiver(s), as comma-separated strings or list. :param from_: Sender email address (``from`` is also supported outside of Python contexts). :param cc: Carbon-copy addresses, as comma-separated strings or list @@ -42,21 +57,28 @@ class MailgunPlugin(MailOutPlugin): :param subject: Mail subject. :param body: Mail body. :param body_type: Mail body type - ``text`` or ``html``. - :param attachments: List of attachment files to send. """ + domain = domain or self._domain + assert domain, 'No domain specified' from_ = from_ or kwargs.pop('from', None) rs = requests.post( f'{self._api_base_url}/{domain}/messages', + timeout=self._timeout, auth=HTTPBasicAuth('api', self._api_key), data={ 'to': ', '.join([to] if isinstance(to, str) else to), 'subject': subject, **{'html' if body_type == 'html' else 'text': body}, **({'from': from_} if from_ else {}), - **({'cc': ', '.join([cc] if isinstance(cc, str) else cc)} if cc else {}), - **({'bcc': ', '.join([bcc] if isinstance(bcc, str) else bcc)} if bcc else {}), + **( + {'cc': ', '.join([cc] if isinstance(cc, str) else cc)} if cc else {} + ), + **( + {'bcc': ', '.join([bcc] if isinstance(bcc, str) else bcc)} + if bcc + else {} + ), }, - files=[os.path.expanduser(attachment) for attachment in (attachments or [])] ) rs.raise_for_status() diff --git a/setup.py b/setup.py index c597e429..0bc17d61 100755 --- a/setup.py +++ b/setup.py @@ -270,8 +270,8 @@ setup( ], # Support for LCD display integration 'lcd': ['RPi.GPIO', 'RPLCD'], - # Support for IMAP mail integration - 'imap': ['imapclient'], + # Support for email integration + 'mail': ['imapclient', 'dnspython'], # Support for NextCloud integration 'nextcloud': ['nextcloud-api-wrapper'], # Support for VLC integration -- 2.45.1