From 8a84a36905c7e146a24d13cd4109a7fac0a77dab Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Fri, 19 Jan 2024 03:11:55 +0100 Subject: [PATCH] [#351] Merged `google.pubsub` plugin and backend. Closes: #351 --- docs/source/backends.rst | 1 - .../platypush/backend/google.pubsub.rst | 5 - platypush/backend/google/__init__.py | 0 platypush/backend/google/pubsub/__init__.py | 88 --------- platypush/backend/google/pubsub/manifest.yaml | 9 - platypush/plugins/google/pubsub/__init__.py | 171 +++++++++++++++--- platypush/plugins/google/pubsub/manifest.yaml | 3 +- platypush/utils/mock/modules.py | 3 + 8 files changed, 150 insertions(+), 130 deletions(-) delete mode 100644 docs/source/platypush/backend/google.pubsub.rst delete mode 100644 platypush/backend/google/__init__.py delete mode 100644 platypush/backend/google/pubsub/__init__.py delete mode 100644 platypush/backend/google/pubsub/manifest.yaml diff --git a/docs/source/backends.rst b/docs/source/backends.rst index 3a152791..8b29fb7f 100644 --- a/docs/source/backends.rst +++ b/docs/source/backends.rst @@ -10,7 +10,6 @@ Backends platypush/backend/button.flic.rst platypush/backend/camera.pi.rst platypush/backend/chat.telegram.rst - platypush/backend/google.pubsub.rst platypush/backend/gps.rst platypush/backend/http.rst platypush/backend/mail.rst diff --git a/docs/source/platypush/backend/google.pubsub.rst b/docs/source/platypush/backend/google.pubsub.rst deleted file mode 100644 index 76bffd3c..00000000 --- a/docs/source/platypush/backend/google.pubsub.rst +++ /dev/null @@ -1,5 +0,0 @@ -``google.pubsub`` -=================================== - -.. automodule:: platypush.backend.google.pubsub - :members: diff --git a/platypush/backend/google/__init__.py b/platypush/backend/google/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/platypush/backend/google/pubsub/__init__.py b/platypush/backend/google/pubsub/__init__.py deleted file mode 100644 index 39a6d3e1..00000000 --- a/platypush/backend/google/pubsub/__init__.py +++ /dev/null @@ -1,88 +0,0 @@ -import json - -from typing import Optional, List - -from platypush.backend import Backend -from platypush.context import get_plugin -from platypush.message.event.google.pubsub import GooglePubsubMessageEvent - - -class GooglePubsubBackend(Backend): - """ - Subscribe to a list of topics on a Google Pub/Sub instance. See - :class:`platypush.plugins.google.pubsub.GooglePubsubPlugin` for a reference on how to generate your - project and credentials file. - """ - - def __init__( - self, topics: List[str], credentials_file: Optional[str] = None, *args, **kwargs - ): - """ - :param topics: List of topics to subscribe. You can either specify the full topic name in the format - ``projects//topics/``, where ```` must be the ID of your - Google Pub/Sub project, or just ```` - in such case it's implied that you refer to the - ``topic_name`` under the ``project_id`` of your service credentials. - :param credentials_file: Path to the Pub/Sub service credentials file (default: value configured on the - ``google.pubsub`` plugin or ``~/.credentials/platypush/google/pubsub.json``). - """ - - super().__init__(*args, name='GooglePubSub', **kwargs) - self.topics = topics - - if credentials_file: - self.credentials_file = credentials_file - else: - plugin = self._get_plugin() - self.credentials_file = plugin.credentials_file - - @staticmethod - def _get_plugin(): - plugin = get_plugin('google.pubsub') - assert plugin, 'google.pubsub plugin not enabled' - return plugin - - def _message_callback(self, topic): - def callback(msg): - data = msg.data.decode() - try: - data = json.loads(data) - except Exception as e: - self.logger.debug('Not a valid JSON: %s: %s', data, e) - - msg.ack() - self.bus.post(GooglePubsubMessageEvent(topic=topic, msg=data)) - - return callback - - def run(self): - # noinspection PyPackageRequirements - from google.cloud import pubsub_v1 - - # noinspection PyPackageRequirements - from google.api_core.exceptions import AlreadyExists - - super().run() - plugin = self._get_plugin() - project_id = plugin.get_project_id() - credentials = plugin.get_credentials(plugin.subscriber_audience) - subscriber = pubsub_v1.SubscriberClient(credentials=credentials) - - for topic in self.topics: - prefix = f'projects/{project_id}/topics/' - if not topic.startswith(prefix): - topic = f'{prefix}{topic}' - subscription_name = '/'.join( - [*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]] - ) - - try: - subscriber.create_subscription(name=subscription_name, topic=topic) - except AlreadyExists: - pass - - subscriber.subscribe(subscription_name, self._message_callback(topic)) - - self.wait_stop() - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/google/pubsub/manifest.yaml b/platypush/backend/google/pubsub/manifest.yaml deleted file mode 100644 index c7fd5c41..00000000 --- a/platypush/backend/google/pubsub/manifest.yaml +++ /dev/null @@ -1,9 +0,0 @@ -manifest: - events: - platypush.message.event.google.pubsub.GooglePubsubMessageEvent: when a new message - is received ona subscribed topic. - install: - pip: - - google-cloud-pubsub - package: platypush.backend.google.pubsub - type: backend diff --git a/platypush/plugins/google/pubsub/__init__.py b/platypush/plugins/google/pubsub/__init__.py index 1834844d..7b66b6be 100644 --- a/platypush/plugins/google/pubsub/__init__.py +++ b/platypush/plugins/google/pubsub/__init__.py @@ -1,59 +1,87 @@ import json import os +from threading import RLock +from typing import Iterable, Optional -from platypush.plugins import Plugin, action +from google.auth import jwt +from google.cloud import pubsub_v1 as pubsub # pylint: disable=no-name-in-module +from google.api_core.exceptions import AlreadyExists, NotFound + +from platypush.config import Config +from platypush.message.event.google.pubsub import GooglePubsubMessageEvent +from platypush.plugins import RunnablePlugin, action -class GooglePubsubPlugin(Plugin): +class GooglePubsubPlugin(RunnablePlugin): """ - Send messages over a Google pub/sub instance. - You'll need a Google Cloud active project and a set of credentials to use this plugin: + Publishes and subscribes to Google Pub/Sub topics. + + You'll need a Google Cloud active project and a set of credentials to use + this plugin: 1. Create a project on the `Google Cloud console - `_ if you don't have - one already. + `_ if you don't have + one already. 2. In the `Google Cloud API console - `_ - create a new service account key. Select "New Service Account", choose - the role "Pub/Sub Editor" and leave the key type as JSON. + `_ + create a new service account key. Select "New Service Account", choose + the role "Pub/Sub Editor" and leave the key type as JSON. 3. Download the JSON service credentials file. By default Platypush - will look for the credentials file under - ``~/.credentials/platypush/google/pubsub.json``. + will look for the credentials file under + ``/credentials/google/pubsub.json``. """ publisher_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher' subscriber_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber' default_credentials_file = os.path.join( - os.path.expanduser('~'), '.credentials', 'platypush', 'google', 'pubsub.json' + Config.get_workdir(), 'credentials', 'google', 'pubsub.json' ) - def __init__(self, credentials_file: str = default_credentials_file, **kwargs): + def __init__( + self, + credentials_file: str = default_credentials_file, + topics: Iterable[str] = (), + **kwargs, + ): """ :param credentials_file: Path to the JSON credentials file for Google pub/sub (default: ``~/.credentials/platypush/google/pubsub.json``) + :param topics: List of topics to subscribe. You can either specify the + full topic name in the format + ``projects//topics/``, where + ```` must be the ID of your Google Pub/Sub project, or + just ```` - in such case it's implied that you refer to + the ``topic_name`` under the ``project_id`` of your service + credentials. """ super().__init__(**kwargs) self.credentials_file = credentials_file self.project_id = self.get_project_id() + self.topics = topics + self._subscriber: Optional[pubsub.SubscriberClient] = None + self._subscriber_lock = RLock() def get_project_id(self): with open(self.credentials_file) as f: return json.load(f).get('project_id') def get_credentials(self, audience: str): - from google.auth import jwt - return jwt.Credentials.from_service_account_file( self.credentials_file, audience=audience ) + def _norm_topic(self, topic: str): + if not topic.startswith(f'projects/{self.project_id}/topics/'): + topic = f'projects/{self.project_id}/topics/{topic}' + return topic + @action - def send_message(self, topic: str, msg, **kwargs): + def publish(self, topic: str, msg, **kwargs): """ - Sends a message to a topic + Publish a message to a topic :param topic: Topic/channel where the message will be delivered. You can either specify the full topic name in the format @@ -65,19 +93,14 @@ class GooglePubsubPlugin(Plugin): :param msg: Message to be sent. It can be a list, a dict, or a Message object :param kwargs: Extra arguments to be passed to .publish() """ - from google.cloud import pubsub_v1 - from google.api_core.exceptions import AlreadyExists - credentials = self.get_credentials(self.publisher_audience) - publisher = pubsub_v1.PublisherClient(credentials=credentials) - - if not topic.startswith(f'projects/{self.project_id}/topics/'): - topic = f'projects/{self.project_id}/topics/{topic}' + publisher = pubsub.PublisherClient(credentials=credentials) + topic = self._norm_topic(topic) try: - publisher.create_topic(topic) + publisher.create_topic(name=topic) except AlreadyExists: - pass + self.logger.debug('Topic %s already exists', topic) if isinstance(msg, (int, float)): msg = str(msg) @@ -88,5 +111,101 @@ class GooglePubsubPlugin(Plugin): publisher.publish(topic, msg, **kwargs) + @action + def send_message(self, topic: str, msg, **kwargs): + """ + Alias for :meth:`.publish` + """ + self.publish(topic=topic, msg=msg, **kwargs) + + @action + def subscribe(self, topic: str): + """ + Subscribe to a topic. + + :param topic: Topic/channel where the message will be delivered. You + can either specify the full topic name in the format + ``projects//topics/``, where + ```` must be the ID of your Google Pub/Sub project, or + just ```` - in such case it's implied that you refer to + the ``topic_name`` under the ``project_id`` of your service + credentials. + """ + assert self._subscriber, 'Subscriber not initialized' + topic = self._norm_topic(topic) + subscription_name = '/'.join( + [*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]] + ) + + try: + self._subscriber.create_subscription(name=subscription_name, topic=topic) + except AlreadyExists: + self.logger.debug('Subscription %s already exists', subscription_name) + + self._subscriber.subscribe(subscription_name, self._message_callback(topic)) + + @action + def unsubscribe(self, topic: str): + """ + Unsubscribe from a topic. + + :param topic: Topic/channel where the message will be delivered. You + can either specify the full topic name in the format + ``projects//topics/``, where + ```` must be the ID of your Google Pub/Sub project, or + just ```` - in such case it's implied that you refer to + the ``topic_name`` under the ``project_id`` of your service + credentials. + """ + assert self._subscriber, 'Subscriber not initialized' + topic = self._norm_topic(topic) + subscription_name = '/'.join( + [*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]] + ) + + try: + self._subscriber.delete_subscription(subscription=subscription_name) + except NotFound: + self.logger.debug('Subscription %s not found', subscription_name) + + def _message_callback(self, topic): + def callback(msg): + data = msg.data.decode() + try: + data = json.loads(data) + except Exception as e: + self.logger.debug('Not a valid JSON: %s: %s', data, e) + + msg.ack() + self._bus.post(GooglePubsubMessageEvent(topic=topic, msg=data)) + + return callback + + def main(self): + credentials = self.get_credentials(self.subscriber_audience) + with self._subscriber_lock: + self._subscriber = pubsub.SubscriberClient(credentials=credentials) + + for topic in self.topics: + self.subscribe(topic=topic) + + self.wait_stop() + with self._subscriber_lock: + self._close() + + def _close(self): + with self._subscriber_lock: + if self._subscriber: + try: + self._subscriber.close() + except Exception as e: + self.logger.debug('Error while closing the subscriber: %s', e) + + self._subscriber = None + + def stop(self): + self._close() + super().stop() + # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/google/pubsub/manifest.yaml b/platypush/plugins/google/pubsub/manifest.yaml index 360fba64..b16e30bc 100644 --- a/platypush/plugins/google/pubsub/manifest.yaml +++ b/platypush/plugins/google/pubsub/manifest.yaml @@ -1,5 +1,6 @@ manifest: - events: {} + events: + - platypush.message.event.google.pubsub.GooglePubsubMessageEvent install: apk: - py3-google-api-python-client diff --git a/platypush/utils/mock/modules.py b/platypush/utils/mock/modules.py index 1a192170..6af74355 100644 --- a/platypush/utils/mock/modules.py +++ b/platypush/utils/mock/modules.py @@ -35,10 +35,13 @@ mock_imports = [ "gi", "gi.repository", "google", + "google.api_core", + "google.auth", "google.assistant.embedded", "google.assistant.library", "google.assistant.library.event", "google.assistant.library.file_helpers", + "google.cloud", "google.oauth2.credentials", "googlesamples", "googlesamples.assistant.grpc.audio_helpers",