From bc7c248f72bfa8a7f22a1aa2a819abb039a6f48c Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Mon, 30 Dec 2019 09:33:26 +0100 Subject: [PATCH] Added Google Pub/Sub integration --- platypush/backend/__init__.py | 41 ++++++----- platypush/backend/google/pubsub.py | 93 ++++++++++++++++++++++++ platypush/message/event/google/pubsub.py | 13 ++++ platypush/plugins/google/pubsub.py | 89 +++++++++++++++++++++++ requirements.txt | 3 + setup.py | 2 + 6 files changed, 222 insertions(+), 19 deletions(-) create mode 100644 platypush/backend/google/pubsub.py create mode 100644 platypush/message/event/google/pubsub.py create mode 100644 platypush/plugins/google/pubsub.py diff --git a/platypush/backend/__init__.py b/platypush/backend/__init__.py index d13b6bbfe8..e81356b197 100644 --- a/platypush/backend/__init__.py +++ b/platypush/backend/__init__.py @@ -53,7 +53,8 @@ class Backend(Thread, EventGenerator): self.bus = bus or Bus() self.device_id = Config.get('device_id') self.thread_id = None - self._stop = False + self._should_stop = False + self._stop_event = threading.Event() self._kwargs = kwargs self.logger = logging.getLogger(self.__class__.__name__) @@ -95,26 +96,23 @@ class Backend(Thread, EventGenerator): if isinstance(msg, StopEvent) and msg.targets_me(): self.logger.info('Received STOP event on {}'.format(self.__class__.__name__)) - self._stop = True + self._should_stop = True else: msg.backend = self # Augment message to be able to process responses self.bus.post(msg) - def _is_expected_response(self, msg): """ Internal only - returns true if we are expecting for a response and msg is that response """ - return self._request_context \ + return self._request_context \ and isinstance(msg, Response) \ and msg.id == self._request_context['request'].id - def _get_backend_config(self): config_name = 'backend.' + self.__class__.__name__.split('Backend')[0].lower() return Config.get(config_name) - def _setup_response_handler(self, request, on_response, response_timeout): def _timeout_hndl(): raise RuntimeError('Timed out while waiting for a response from {}'. @@ -127,7 +125,7 @@ class Backend(Thread, EventGenerator): } resp_backend = self.__class__(bus=self.bus, _req_ctx=req_ctx, - **self._get_backend_config(), **self._kwargs) + **self._get_backend_config(), **self._kwargs) # Set the response timeout if response_timeout: @@ -135,12 +133,12 @@ class Backend(Thread, EventGenerator): resp_backend.start() - def send_event(self, event, **kwargs): """ Send an event message on the backend. - :param event: Event to send. It can be a dict, a string/bytes UTF-8 JSON, or a platypush.message.event.Event object. + :param event: Event to send. It can be a dict, a string/bytes UTF-8 JSON, or a platypush.message.event.Event + object. """ event = Event.build(event) @@ -152,18 +150,20 @@ class Backend(Thread, EventGenerator): self.send_message(event, **kwargs) - def send_request(self, request, on_response=None, response_timeout=_default_response_timeout, **kwargs): """ Send a request message on the backend. - :param request: The request, either a dict, a string/bytes UTF-8 JSON, or a platypush.message.request.Request object. + :param request: The request, either a dict, a string/bytes UTF-8 JSON, or a platypush.message.request.Request + object. - :param on_response: Optional callback that will be called when a response is received. If set, this method will synchronously wait for a response before exiting. + :param on_response: Optional callback that will be called when a response is received. If set, this method will + synchronously wait for a response before exiting. :type on_response: function - :param response_timeout: If on_response is set, the backend will raise an exception if the response isn't received within this number of seconds (default: None) + :param response_timeout: If on_response is set, the backend will raise an exception if the response isn't + received within this number of seconds (default: None) :type response_timeout: float """ @@ -177,12 +177,12 @@ class Backend(Thread, EventGenerator): self.send_message(request, **kwargs) - def send_response(self, response, request, **kwargs): """ Send a response message on the backend. - :param response: The response, either a dict, a string/bytes UTF-8 JSON, or a platypush.message.response.Response object + :param response: The response, either a dict, a string/bytes UTF-8 JSON, or a + :class:`platypush.message.response.Response` object. :param request: Associated request, used to set the response parameters that will link them """ @@ -191,7 +191,6 @@ class Backend(Thread, EventGenerator): self.send_message(response, **kwargs) - def send_message(self, msg, queue_name=None, **kwargs): """ Sends a platypush.message.Message to a node. @@ -200,7 +199,8 @@ class Backend(Thread, EventGenerator): other consumers through the configured Redis main queue. :param msg: The message to send - :param queue_name: Send the message on a specific queue (default: the queue_name configured on the Redis backend) + :param queue_name: Send the message on a specific queue (default: the queue_name configured on the Redis + backend) """ try: @@ -214,7 +214,6 @@ class Backend(Thread, EventGenerator): redis.send_message(msg, queue_name=queue_name) - def run(self): """ Starts the backend thread. To be implemented in the derived classes """ self.thread_id = threading.get_ident() @@ -231,12 +230,16 @@ class Backend(Thread, EventGenerator): thread_id=self.thread_id) self.send_message(evt) + self._stop_event.set() self.on_stop() Thread(target=_async_stop).start() def should_stop(self): - return self._stop + return self._should_stop + + def wait_stop(self, timeout=None): + self._stop_event.wait(timeout) def _get_redis(self): import redis diff --git a/platypush/backend/google/pubsub.py b/platypush/backend/google/pubsub.py new file mode 100644 index 0000000000..f122e1494f --- /dev/null +++ b/platypush/backend/google/pubsub.py @@ -0,0 +1,93 @@ +import json + +from typing import Optional, List + +from platypush.backend import Backend +from platypush.context import get_plugin +from platypush.message.event.google.pubsub import GooglePubsubMessageEvent +from platypush.utils import set_thread_name + + +class GooglePubsubBackend(Backend): + """ + Subscribe to a list of topics on a Google Pub/Sub instance. See + :class:`platypush.plugins.google.pubsub.GooglePubsubPlugin` for a reference on how to generate your + project and credentials file. + + Triggers: + + * :class:`platypush.message.event.google.pubsub.GooglePubsubMessageEvent` when a new message is received on + a subscribed topic. + + Requires: + + * **google-cloud-pubsub** (``pip install google-cloud-pubsub``) + + """ + + def __init__(self, topics: List[str], credentials_file: Optional[str] = None, *args, **kwargs): + """ + :param topics: List of topics to subscribe. You can either specify the full topic name in the format + ``projects//topics/``, where ```` must be the ID of your + Google Pub/Sub project, or just ```` - in such case it's implied that you refer to the + ``topic_name`` under the ``project_id`` of your service credentials. + :param credentials_file: Path to the Pub/Sub service credentials file (default: value configured on the + ``google.pubsub`` plugin or ``~/.credentials/platypush/google/pubsub.json``). + """ + + super().__init__(*args, **kwargs) + self.topics = topics + + if credentials_file: + self.credentials_file = credentials_file + else: + plugin = self._get_plugin() + self.credentials_file = plugin.credentials_file + + @staticmethod + def _get_plugin(): + return get_plugin('google.pubsub') + + def _message_callback(self, topic): + def callback(msg): + data = msg.data.decode() + # noinspection PyBroadException + try: + data = json.loads(data) + except: + pass + + msg.ack() + self.bus.post(GooglePubsubMessageEvent(topic=topic, msg=data)) + + return callback + + def run(self): + # noinspection PyPackageRequirements + from google.cloud import pubsub_v1 + # noinspection PyPackageRequirements + from google.api_core.exceptions import AlreadyExists + + super().run() + set_thread_name('GooglePubSub') + plugin = self._get_plugin() + project_id = plugin.get_project_id() + credentials = plugin.get_credentials(plugin.subscriber_audience) + subscriber = pubsub_v1.SubscriberClient(credentials=credentials) + + for topic in self.topics: + if not topic.startswith('projects/{}/topics/'.format(project_id)): + topic = 'projects/{}/topics/{}'.format(project_id, topic) + subscription_name = '/'.join([*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]) + + try: + subscriber.create_subscription(name=subscription_name, topic=topic) + except AlreadyExists: + pass + + subscriber.subscribe(subscription_name, self._message_callback(topic)) + + self.wait_stop() + + +# vim:sw=4:ts=4:et: diff --git a/platypush/message/event/google/pubsub.py b/platypush/message/event/google/pubsub.py new file mode 100644 index 0000000000..60cc006063 --- /dev/null +++ b/platypush/message/event/google/pubsub.py @@ -0,0 +1,13 @@ +from platypush.message.event import Event + + +class GooglePubsubMessageEvent(Event): + """ + Event triggered when a new message is received on a subscribed Google Pub/Sub topic. + """ + + def __init__(self, topic: str, msg, *args, **kwargs): + super().__init__(*args, topic=topic, msg=msg, *args, **kwargs) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/google/pubsub.py b/platypush/plugins/google/pubsub.py new file mode 100644 index 0000000000..cfe3746b6a --- /dev/null +++ b/platypush/plugins/google/pubsub.py @@ -0,0 +1,89 @@ +import json +import os + +from platypush.plugins import Plugin, action + + +class GooglePubsubPlugin(Plugin): + """ + Send messages over a Google pub/sub instance. + You'll need a Google Cloud active project and a set of credentials to use this plugin: + + 1. Create a project on the `Google Cloud console `_ if + you don't have one already. + + 2. In the `Google Cloud API console `_ + create a new service account key. Select "New Service Account", choose the role "Pub/Sub Editor" and leave + the key type as JSON. + + 3. Download the JSON service credentials file. By default platypush will look for the credentials file under + ~/.credentials/platypush/google/pubsub.json. + + Requires: + + * **google-cloud-pubsub** (``pip install google-cloud-pubsub``) + + """ + + publisher_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher' + subscriber_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber' + default_credentials_file = os.path.join(os.path.expanduser('~'), + '.credentials', 'platypush', 'google', 'pubsub.json') + + def __init__(self, credentials_file: str = default_credentials_file, **kwargs): + """ + :param credentials_file: Path to the JSON credentials file for Google pub/sub (default: + ~/.credentials/platypush/google/pubsub.json) + """ + super().__init__(**kwargs) + self.credentials_file = credentials_file + self.project_id = self.get_project_id() + + def get_project_id(self): + credentials = json.load(open(self.credentials_file)) + return credentials.get('project_id') + + def get_credentials(self, audience: str): + # noinspection PyPackageRequirements + from google.auth import jwt + return jwt.Credentials.from_service_account_file(self.credentials_file, audience=audience) + + @action + def send_message(self, topic: str, msg, **kwargs): + """ + Sends a message to a topic + + :param topic: Topic/channel where the message will be delivered. You can either specify the full topic name in + the format ``projects//topics/``, where ```` must be the ID of your + Google Pub/Sub project, or just ```` - in such case it's implied that you refer to the + ``topic_name`` under the ``project_id`` of your service credentials. + :param msg: Message to be sent. It can be a list, a dict, or a Message object + :param kwargs: Extra arguments to be passed to .publish() + """ + # noinspection PyPackageRequirements + from google.cloud import pubsub_v1 + # noinspection PyPackageRequirements + from google.api_core.exceptions import AlreadyExists + + credentials = self.get_credentials(self.publisher_audience) + publisher = pubsub_v1.PublisherClient(credentials=credentials) + + if not topic.startswith('projects/{}/topics/'.format(self.project_id)): + topic = 'projects/{}/topics/{}'.format(self.project_id, topic) + + try: + publisher.create_topic(topic) + except AlreadyExists: + pass + + if isinstance(msg, int) or isinstance(msg, float): + msg = str(msg) + if isinstance(msg, dict) or isinstance(msg, list): + msg = json.dumps(msg) + if isinstance(msg, str): + msg = msg.encode() + + publisher.publish(topic, msg, **kwargs) + + +# vim:sw=4:ts=4:et: diff --git a/requirements.txt b/requirements.txt index 67df6b8090..25ab4c3bd6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -195,3 +195,6 @@ croniter # Support for Trello integration # py-trello +# Support for Google Pub/Sub +# google-cloud-pubsub + diff --git a/setup.py b/setup.py index 26f55d6183..8e27b15d47 100755 --- a/setup.py +++ b/setup.py @@ -256,6 +256,8 @@ setup( 'todoist': ['todoist-python'], # Support for Trello integration 'trello': ['py-trello'], + # Support for Google Pub/Sub + 'google-pubsub': ['google-cloud-pubsub'], }, )