From eb47f9ded0413b350f495a24041cbabc9b99f1f0 Mon Sep 17 00:00:00 2001
From: Fabio Manganiello <fabio@manganiello.tech>
Date: Fri, 19 Jan 2024 03:11:55 +0100
Subject: [PATCH] [#351] Merged `google.pubsub` plugin and backend.

Closes: #351
---
 docs/source/backends.rst                      |   1 -
 .../platypush/backend/google.pubsub.rst       |   5 -
 platypush/backend/google/__init__.py          |   0
 platypush/backend/google/pubsub/__init__.py   |  88 ---------
 platypush/backend/google/pubsub/manifest.yaml |   9 -
 platypush/plugins/google/pubsub/__init__.py   | 171 +++++++++++++++---
 platypush/plugins/google/pubsub/manifest.yaml |   3 +-
 platypush/utils/mock/modules.py               |   3 +
 8 files changed, 150 insertions(+), 130 deletions(-)
 delete mode 100644 docs/source/platypush/backend/google.pubsub.rst
 delete mode 100644 platypush/backend/google/__init__.py
 delete mode 100644 platypush/backend/google/pubsub/__init__.py
 delete mode 100644 platypush/backend/google/pubsub/manifest.yaml

diff --git a/docs/source/backends.rst b/docs/source/backends.rst
index 3a152791e..8b29fb7ff 100644
--- a/docs/source/backends.rst
+++ b/docs/source/backends.rst
@@ -10,7 +10,6 @@ Backends
     platypush/backend/button.flic.rst
     platypush/backend/camera.pi.rst
     platypush/backend/chat.telegram.rst
-    platypush/backend/google.pubsub.rst
     platypush/backend/gps.rst
     platypush/backend/http.rst
     platypush/backend/mail.rst
diff --git a/docs/source/platypush/backend/google.pubsub.rst b/docs/source/platypush/backend/google.pubsub.rst
deleted file mode 100644
index 76bffd3cd..000000000
--- a/docs/source/platypush/backend/google.pubsub.rst
+++ /dev/null
@@ -1,5 +0,0 @@
-``google.pubsub``
-===================================
-
-.. automodule:: platypush.backend.google.pubsub
-    :members:
diff --git a/platypush/backend/google/__init__.py b/platypush/backend/google/__init__.py
deleted file mode 100644
index e69de29bb..000000000
diff --git a/platypush/backend/google/pubsub/__init__.py b/platypush/backend/google/pubsub/__init__.py
deleted file mode 100644
index 39a6d3e1a..000000000
--- a/platypush/backend/google/pubsub/__init__.py
+++ /dev/null
@@ -1,88 +0,0 @@
-import json
-
-from typing import Optional, List
-
-from platypush.backend import Backend
-from platypush.context import get_plugin
-from platypush.message.event.google.pubsub import GooglePubsubMessageEvent
-
-
-class GooglePubsubBackend(Backend):
-    """
-    Subscribe to a list of topics on a Google Pub/Sub instance. See
-    :class:`platypush.plugins.google.pubsub.GooglePubsubPlugin` for a reference on how to generate your
-    project and credentials file.
-    """
-
-    def __init__(
-        self, topics: List[str], credentials_file: Optional[str] = None, *args, **kwargs
-    ):
-        """
-        :param topics: List of topics to subscribe. You can either specify the full topic name in the format
-            ``projects/<project_id>/topics/<topic_name>``, where ``<project_id>`` must be the ID of your
-            Google Pub/Sub project, or just ``<topic_name>``  - in such case it's implied that you refer to the
-            ``topic_name`` under the ``project_id`` of your service credentials.
-        :param credentials_file: Path to the Pub/Sub service credentials file (default: value configured on the
-            ``google.pubsub`` plugin or ``~/.credentials/platypush/google/pubsub.json``).
-        """
-
-        super().__init__(*args, name='GooglePubSub', **kwargs)
-        self.topics = topics
-
-        if credentials_file:
-            self.credentials_file = credentials_file
-        else:
-            plugin = self._get_plugin()
-            self.credentials_file = plugin.credentials_file
-
-    @staticmethod
-    def _get_plugin():
-        plugin = get_plugin('google.pubsub')
-        assert plugin, 'google.pubsub plugin not enabled'
-        return plugin
-
-    def _message_callback(self, topic):
-        def callback(msg):
-            data = msg.data.decode()
-            try:
-                data = json.loads(data)
-            except Exception as e:
-                self.logger.debug('Not a valid JSON: %s: %s', data, e)
-
-            msg.ack()
-            self.bus.post(GooglePubsubMessageEvent(topic=topic, msg=data))
-
-        return callback
-
-    def run(self):
-        # noinspection PyPackageRequirements
-        from google.cloud import pubsub_v1
-
-        # noinspection PyPackageRequirements
-        from google.api_core.exceptions import AlreadyExists
-
-        super().run()
-        plugin = self._get_plugin()
-        project_id = plugin.get_project_id()
-        credentials = plugin.get_credentials(plugin.subscriber_audience)
-        subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
-
-        for topic in self.topics:
-            prefix = f'projects/{project_id}/topics/'
-            if not topic.startswith(prefix):
-                topic = f'{prefix}{topic}'
-            subscription_name = '/'.join(
-                [*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
-            )
-
-            try:
-                subscriber.create_subscription(name=subscription_name, topic=topic)
-            except AlreadyExists:
-                pass
-
-            subscriber.subscribe(subscription_name, self._message_callback(topic))
-
-        self.wait_stop()
-
-
-# vim:sw=4:ts=4:et:
diff --git a/platypush/backend/google/pubsub/manifest.yaml b/platypush/backend/google/pubsub/manifest.yaml
deleted file mode 100644
index c7fd5c41d..000000000
--- a/platypush/backend/google/pubsub/manifest.yaml
+++ /dev/null
@@ -1,9 +0,0 @@
-manifest:
-  events:
-    platypush.message.event.google.pubsub.GooglePubsubMessageEvent: when a new message
-      is received ona subscribed topic.
-  install:
-    pip:
-    - google-cloud-pubsub
-  package: platypush.backend.google.pubsub
-  type: backend
diff --git a/platypush/plugins/google/pubsub/__init__.py b/platypush/plugins/google/pubsub/__init__.py
index 1834844dd..7b66b6bee 100644
--- a/platypush/plugins/google/pubsub/__init__.py
+++ b/platypush/plugins/google/pubsub/__init__.py
@@ -1,59 +1,87 @@
 import json
 import os
+from threading import RLock
+from typing import Iterable, Optional
 
-from platypush.plugins import Plugin, action
+from google.auth import jwt
+from google.cloud import pubsub_v1 as pubsub  # pylint: disable=no-name-in-module
+from google.api_core.exceptions import AlreadyExists, NotFound
+
+from platypush.config import Config
+from platypush.message.event.google.pubsub import GooglePubsubMessageEvent
+from platypush.plugins import RunnablePlugin, action
 
 
-class GooglePubsubPlugin(Plugin):
+class GooglePubsubPlugin(RunnablePlugin):
     """
-    Send messages over a Google pub/sub instance.
-    You'll need a Google Cloud active project and a set of credentials to use this plugin:
+    Publishes and subscribes to Google Pub/Sub topics.
+
+    You'll need a Google Cloud active project and a set of credentials to use
+    this plugin:
 
         1. Create a project on the `Google Cloud console
-        <https://console.cloud.google.com/projectcreate>`_ if you don't have
-        one already.
+           <https://console.cloud.google.com/projectcreate>`_ if you don't have
+           one already.
 
         2. In the `Google Cloud API console
-        <https://console.cloud.google.com/apis/credentials/serviceaccountkey>`_
-        create a new service account key. Select "New Service Account", choose
-        the role "Pub/Sub Editor" and leave the key type as JSON.
+           <https://console.cloud.google.com/apis/credentials/serviceaccountkey>`_
+           create a new service account key. Select "New Service Account", choose
+           the role "Pub/Sub Editor" and leave the key type as JSON.
 
         3. Download the JSON service credentials file. By default Platypush
-        will look for the credentials file under
-        ``~/.credentials/platypush/google/pubsub.json``.
+           will look for the credentials file under
+           ``<WORKDIR>/credentials/google/pubsub.json``.
 
     """
 
     publisher_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher'
     subscriber_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber'
     default_credentials_file = os.path.join(
-        os.path.expanduser('~'), '.credentials', 'platypush', 'google', 'pubsub.json'
+        Config.get_workdir(), 'credentials', 'google', 'pubsub.json'
     )
 
-    def __init__(self, credentials_file: str = default_credentials_file, **kwargs):
+    def __init__(
+        self,
+        credentials_file: str = default_credentials_file,
+        topics: Iterable[str] = (),
+        **kwargs,
+    ):
         """
         :param credentials_file: Path to the JSON credentials file for Google
             pub/sub (default: ``~/.credentials/platypush/google/pubsub.json``)
+        :param topics: List of topics to subscribe. You can either specify the
+            full topic name in the format
+            ``projects/<project_id>/topics/<topic_name>``, where
+            ``<project_id>`` must be the ID of your Google Pub/Sub project, or
+            just ``<topic_name>`` - in such case it's implied that you refer to
+            the ``topic_name`` under the ``project_id`` of your service
+            credentials.
         """
         super().__init__(**kwargs)
         self.credentials_file = credentials_file
         self.project_id = self.get_project_id()
+        self.topics = topics
+        self._subscriber: Optional[pubsub.SubscriberClient] = None
+        self._subscriber_lock = RLock()
 
     def get_project_id(self):
         with open(self.credentials_file) as f:
             return json.load(f).get('project_id')
 
     def get_credentials(self, audience: str):
-        from google.auth import jwt
-
         return jwt.Credentials.from_service_account_file(
             self.credentials_file, audience=audience
         )
 
+    def _norm_topic(self, topic: str):
+        if not topic.startswith(f'projects/{self.project_id}/topics/'):
+            topic = f'projects/{self.project_id}/topics/{topic}'
+        return topic
+
     @action
-    def send_message(self, topic: str, msg, **kwargs):
+    def publish(self, topic: str, msg, **kwargs):
         """
-        Sends a message to a topic
+        Publish a message to a topic
 
         :param topic: Topic/channel where the message will be delivered. You
             can either specify the full topic name in the format
@@ -65,19 +93,14 @@ class GooglePubsubPlugin(Plugin):
         :param msg: Message to be sent. It can be a list, a dict, or a Message object
         :param kwargs: Extra arguments to be passed to .publish()
         """
-        from google.cloud import pubsub_v1
-        from google.api_core.exceptions import AlreadyExists
-
         credentials = self.get_credentials(self.publisher_audience)
-        publisher = pubsub_v1.PublisherClient(credentials=credentials)
-
-        if not topic.startswith(f'projects/{self.project_id}/topics/'):
-            topic = f'projects/{self.project_id}/topics/{topic}'
+        publisher = pubsub.PublisherClient(credentials=credentials)
+        topic = self._norm_topic(topic)
 
         try:
-            publisher.create_topic(topic)
+            publisher.create_topic(name=topic)
         except AlreadyExists:
-            pass
+            self.logger.debug('Topic %s already exists', topic)
 
         if isinstance(msg, (int, float)):
             msg = str(msg)
@@ -88,5 +111,101 @@ class GooglePubsubPlugin(Plugin):
 
         publisher.publish(topic, msg, **kwargs)
 
+    @action
+    def send_message(self, topic: str, msg, **kwargs):
+        """
+        Alias for :meth:`.publish`
+        """
+        self.publish(topic=topic, msg=msg, **kwargs)
+
+    @action
+    def subscribe(self, topic: str):
+        """
+        Subscribe to a topic.
+
+        :param topic: Topic/channel where the message will be delivered. You
+            can either specify the full topic name in the format
+            ``projects/<project_id>/topics/<topic_name>``, where
+            ``<project_id>`` must be the ID of your Google Pub/Sub project, or
+            just ``<topic_name>`` - in such case it's implied that you refer to
+            the ``topic_name`` under the ``project_id`` of your service
+            credentials.
+        """
+        assert self._subscriber, 'Subscriber not initialized'
+        topic = self._norm_topic(topic)
+        subscription_name = '/'.join(
+            [*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
+        )
+
+        try:
+            self._subscriber.create_subscription(name=subscription_name, topic=topic)
+        except AlreadyExists:
+            self.logger.debug('Subscription %s already exists', subscription_name)
+
+        self._subscriber.subscribe(subscription_name, self._message_callback(topic))
+
+    @action
+    def unsubscribe(self, topic: str):
+        """
+        Unsubscribe from a topic.
+
+        :param topic: Topic/channel where the message will be delivered. You
+            can either specify the full topic name in the format
+            ``projects/<project_id>/topics/<topic_name>``, where
+            ``<project_id>`` must be the ID of your Google Pub/Sub project, or
+            just ``<topic_name>`` - in such case it's implied that you refer to
+            the ``topic_name`` under the ``project_id`` of your service
+            credentials.
+        """
+        assert self._subscriber, 'Subscriber not initialized'
+        topic = self._norm_topic(topic)
+        subscription_name = '/'.join(
+            [*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
+        )
+
+        try:
+            self._subscriber.delete_subscription(subscription=subscription_name)
+        except NotFound:
+            self.logger.debug('Subscription %s not found', subscription_name)
+
+    def _message_callback(self, topic):
+        def callback(msg):
+            data = msg.data.decode()
+            try:
+                data = json.loads(data)
+            except Exception as e:
+                self.logger.debug('Not a valid JSON: %s: %s', data, e)
+
+            msg.ack()
+            self._bus.post(GooglePubsubMessageEvent(topic=topic, msg=data))
+
+        return callback
+
+    def main(self):
+        credentials = self.get_credentials(self.subscriber_audience)
+        with self._subscriber_lock:
+            self._subscriber = pubsub.SubscriberClient(credentials=credentials)
+
+        for topic in self.topics:
+            self.subscribe(topic=topic)
+
+        self.wait_stop()
+        with self._subscriber_lock:
+            self._close()
+
+    def _close(self):
+        with self._subscriber_lock:
+            if self._subscriber:
+                try:
+                    self._subscriber.close()
+                except Exception as e:
+                    self.logger.debug('Error while closing the subscriber: %s', e)
+
+                self._subscriber = None
+
+    def stop(self):
+        self._close()
+        super().stop()
+
 
 # vim:sw=4:ts=4:et:
diff --git a/platypush/plugins/google/pubsub/manifest.yaml b/platypush/plugins/google/pubsub/manifest.yaml
index 360fba64e..b16e30bc7 100644
--- a/platypush/plugins/google/pubsub/manifest.yaml
+++ b/platypush/plugins/google/pubsub/manifest.yaml
@@ -1,5 +1,6 @@
 manifest:
-  events: {}
+  events:
+    - platypush.message.event.google.pubsub.GooglePubsubMessageEvent
   install:
     apk:
       - py3-google-api-python-client
diff --git a/platypush/utils/mock/modules.py b/platypush/utils/mock/modules.py
index 1a1921700..6af743558 100644
--- a/platypush/utils/mock/modules.py
+++ b/platypush/utils/mock/modules.py
@@ -35,10 +35,13 @@ mock_imports = [
     "gi",
     "gi.repository",
     "google",
+    "google.api_core",
+    "google.auth",
     "google.assistant.embedded",
     "google.assistant.library",
     "google.assistant.library.event",
     "google.assistant.library.file_helpers",
+    "google.cloud",
     "google.oauth2.credentials",
     "googlesamples",
     "googlesamples.assistant.grpc.audio_helpers",