[#351] Merged google.pubsub plugin and backend.

Closes: #351
This commit is contained in:
Fabio Manganiello 2024-01-19 03:11:55 +01:00
parent 2e9cb44caf
commit eb47f9ded0
8 changed files with 150 additions and 130 deletions

View file

@ -10,7 +10,6 @@ Backends

View file

@ -1,5 +0,0 @@
.. automodule:: platypush.backend.google.pubsub

View file

@ -1,88 +0,0 @@
import json
from typing import Optional, List
from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.message.event.google.pubsub import GooglePubsubMessageEvent
class GooglePubsubBackend(Backend):
Subscribe to a list of topics on a Google Pub/Sub instance. See
:class:`platypush.plugins.google.pubsub.GooglePubsubPlugin` for a reference on how to generate your
project and credentials file.
def __init__(
self, topics: List[str], credentials_file: Optional[str] = None, *args, **kwargs
:param topics: List of topics to subscribe. You can either specify the full topic name in the format
``projects/<project_id>/topics/<topic_name>``, where ``<project_id>`` must be the ID of your
Google Pub/Sub project, or just ``<topic_name>`` - in such case it's implied that you refer to the
``topic_name`` under the ``project_id`` of your service credentials.
:param credentials_file: Path to the Pub/Sub service credentials file (default: value configured on the
``google.pubsub`` plugin or ``~/.credentials/platypush/google/pubsub.json``).
super().__init__(*args, name='GooglePubSub', **kwargs)
self.topics = topics
if credentials_file:
self.credentials_file = credentials_file
plugin = self._get_plugin()
self.credentials_file = plugin.credentials_file
def _get_plugin():
plugin = get_plugin('google.pubsub')
assert plugin, 'google.pubsub plugin not enabled'
return plugin
def _message_callback(self, topic):
def callback(msg):
data = msg.data.decode()
data = json.loads(data)
except Exception as e:
self.logger.debug('Not a valid JSON: %s: %s', data, e)
self.bus.post(GooglePubsubMessageEvent(topic=topic, msg=data))
return callback
def run(self):
# noinspection PyPackageRequirements
from google.cloud import pubsub_v1
# noinspection PyPackageRequirements
from google.api_core.exceptions import AlreadyExists
plugin = self._get_plugin()
project_id = plugin.get_project_id()
credentials = plugin.get_credentials(plugin.subscriber_audience)
subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
for topic in self.topics:
prefix = f'projects/{project_id}/topics/'
if not topic.startswith(prefix):
topic = f'{prefix}{topic}'
subscription_name = '/'.join(
[*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
subscriber.create_subscription(name=subscription_name, topic=topic)
except AlreadyExists:
subscriber.subscribe(subscription_name, self._message_callback(topic))
# vim:sw=4:ts=4:et:

View file

@ -1,9 +0,0 @@
platypush.message.event.google.pubsub.GooglePubsubMessageEvent: when a new message
is received ona subscribed topic.
- google-cloud-pubsub
package: platypush.backend.google.pubsub
type: backend

View file

@ -1,59 +1,87 @@
import json
import os
from threading import RLock
from typing import Iterable, Optional
from platypush.plugins import Plugin, action
from google.auth import jwt
from google.cloud import pubsub_v1 as pubsub # pylint: disable=no-name-in-module
from google.api_core.exceptions import AlreadyExists, NotFound
from platypush.config import Config
from platypush.message.event.google.pubsub import GooglePubsubMessageEvent
from platypush.plugins import RunnablePlugin, action
class GooglePubsubPlugin(Plugin):
class GooglePubsubPlugin(RunnablePlugin):
Send messages over a Google pub/sub instance.
You'll need a Google Cloud active project and a set of credentials to use this plugin:
Publishes and subscribes to Google Pub/Sub topics.
You'll need a Google Cloud active project and a set of credentials to use
this plugin:
1. Create a project on the `Google Cloud console
<https://console.cloud.google.com/projectcreate>`_ if you don't have
one already.
<https://console.cloud.google.com/projectcreate>`_ if you don't have
one already.
2. In the `Google Cloud API console
create a new service account key. Select "New Service Account", choose
the role "Pub/Sub Editor" and leave the key type as JSON.
create a new service account key. Select "New Service Account", choose
the role "Pub/Sub Editor" and leave the key type as JSON.
3. Download the JSON service credentials file. By default Platypush
will look for the credentials file under
will look for the credentials file under
publisher_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher'
subscriber_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber'
default_credentials_file = os.path.join(
os.path.expanduser('~'), '.credentials', 'platypush', 'google', 'pubsub.json'
Config.get_workdir(), 'credentials', 'google', 'pubsub.json'
def __init__(self, credentials_file: str = default_credentials_file, **kwargs):
def __init__(
credentials_file: str = default_credentials_file,
topics: Iterable[str] = (),
:param credentials_file: Path to the JSON credentials file for Google
pub/sub (default: ``~/.credentials/platypush/google/pubsub.json``)
:param topics: List of topics to subscribe. You can either specify the
full topic name in the format
``projects/<project_id>/topics/<topic_name>``, where
``<project_id>`` must be the ID of your Google Pub/Sub project, or
just ``<topic_name>`` - in such case it's implied that you refer to
the ``topic_name`` under the ``project_id`` of your service
self.credentials_file = credentials_file
self.project_id = self.get_project_id()
self.topics = topics
self._subscriber: Optional[pubsub.SubscriberClient] = None
self._subscriber_lock = RLock()
def get_project_id(self):
with open(self.credentials_file) as f:
return json.load(f).get('project_id')
def get_credentials(self, audience: str):
from google.auth import jwt
return jwt.Credentials.from_service_account_file(
self.credentials_file, audience=audience
def _norm_topic(self, topic: str):
if not topic.startswith(f'projects/{self.project_id}/topics/'):
topic = f'projects/{self.project_id}/topics/{topic}'
return topic
def send_message(self, topic: str, msg, **kwargs):
def publish(self, topic: str, msg, **kwargs):
Sends a message to a topic
Publish a message to a topic
:param topic: Topic/channel where the message will be delivered. You
can either specify the full topic name in the format
@ -65,19 +93,14 @@ class GooglePubsubPlugin(Plugin):
:param msg: Message to be sent. It can be a list, a dict, or a Message object
:param kwargs: Extra arguments to be passed to .publish()
from google.cloud import pubsub_v1
from google.api_core.exceptions import AlreadyExists
credentials = self.get_credentials(self.publisher_audience)
publisher = pubsub_v1.PublisherClient(credentials=credentials)
if not topic.startswith(f'projects/{self.project_id}/topics/'):
topic = f'projects/{self.project_id}/topics/{topic}'
publisher = pubsub.PublisherClient(credentials=credentials)
topic = self._norm_topic(topic)
except AlreadyExists:
self.logger.debug('Topic %s already exists', topic)
if isinstance(msg, (int, float)):
msg = str(msg)
@ -88,5 +111,101 @@ class GooglePubsubPlugin(Plugin):
publisher.publish(topic, msg, **kwargs)
def send_message(self, topic: str, msg, **kwargs):
Alias for :meth:`.publish`
self.publish(topic=topic, msg=msg, **kwargs)
def subscribe(self, topic: str):
Subscribe to a topic.
:param topic: Topic/channel where the message will be delivered. You
can either specify the full topic name in the format
``projects/<project_id>/topics/<topic_name>``, where
``<project_id>`` must be the ID of your Google Pub/Sub project, or
just ``<topic_name>`` - in such case it's implied that you refer to
the ``topic_name`` under the ``project_id`` of your service
assert self._subscriber, 'Subscriber not initialized'
topic = self._norm_topic(topic)
subscription_name = '/'.join(
[*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
self._subscriber.create_subscription(name=subscription_name, topic=topic)
except AlreadyExists:
self.logger.debug('Subscription %s already exists', subscription_name)
self._subscriber.subscribe(subscription_name, self._message_callback(topic))
def unsubscribe(self, topic: str):
Unsubscribe from a topic.
:param topic: Topic/channel where the message will be delivered. You
can either specify the full topic name in the format
``projects/<project_id>/topics/<topic_name>``, where
``<project_id>`` must be the ID of your Google Pub/Sub project, or
just ``<topic_name>`` - in such case it's implied that you refer to
the ``topic_name`` under the ``project_id`` of your service
assert self._subscriber, 'Subscriber not initialized'
topic = self._norm_topic(topic)
subscription_name = '/'.join(
[*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
except NotFound:
self.logger.debug('Subscription %s not found', subscription_name)
def _message_callback(self, topic):
def callback(msg):
data = msg.data.decode()
data = json.loads(data)
except Exception as e:
self.logger.debug('Not a valid JSON: %s: %s', data, e)
self._bus.post(GooglePubsubMessageEvent(topic=topic, msg=data))
return callback
def main(self):
credentials = self.get_credentials(self.subscriber_audience)
with self._subscriber_lock:
self._subscriber = pubsub.SubscriberClient(credentials=credentials)
for topic in self.topics:
with self._subscriber_lock:
def _close(self):
with self._subscriber_lock:
if self._subscriber:
except Exception as e:
self.logger.debug('Error while closing the subscriber: %s', e)
self._subscriber = None
def stop(self):
# vim:sw=4:ts=4:et:

View file

@ -1,5 +1,6 @@
events: {}
- platypush.message.event.google.pubsub.GooglePubsubMessageEvent
- py3-google-api-python-client

View file

@ -35,10 +35,13 @@ mock_imports = [