2019-12-30 09:33:26 +01:00
|
|
|
import json
|
|
|
|
import os
|
|
|
|
|
|
|
|
from platypush.plugins import Plugin, action
|
|
|
|
|
|
|
|
|
|
|
|
class GooglePubsubPlugin(Plugin):
|
|
|
|
"""
|
|
|
|
Send messages over a Google pub/sub instance.
|
|
|
|
You'll need a Google Cloud active project and a set of credentials to use this plugin:
|
|
|
|
|
2023-10-01 15:37:20 +02:00
|
|
|
1. Create a project on the `Google Cloud console
|
|
|
|
<https://console.cloud.google.com/projectcreate>`_ if you don't have
|
|
|
|
one already.
|
2019-12-30 09:33:26 +01:00
|
|
|
|
2023-10-01 15:37:20 +02:00
|
|
|
2. In the `Google Cloud API console
|
|
|
|
<https://console.cloud.google.com/apis/credentials/serviceaccountkey>`_
|
|
|
|
create a new service account key. Select "New Service Account", choose
|
|
|
|
the role "Pub/Sub Editor" and leave the key type as JSON.
|
2019-12-30 09:33:26 +01:00
|
|
|
|
2023-10-01 15:37:20 +02:00
|
|
|
3. Download the JSON service credentials file. By default Platypush
|
|
|
|
will look for the credentials file under
|
|
|
|
``~/.credentials/platypush/google/pubsub.json``.
|
2019-12-30 09:33:26 +01:00
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
publisher_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher'
|
|
|
|
subscriber_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber'
|
2023-09-24 16:54:43 +02:00
|
|
|
default_credentials_file = os.path.join(
|
|
|
|
os.path.expanduser('~'), '.credentials', 'platypush', 'google', 'pubsub.json'
|
|
|
|
)
|
2019-12-30 09:33:26 +01:00
|
|
|
|
|
|
|
def __init__(self, credentials_file: str = default_credentials_file, **kwargs):
|
|
|
|
"""
|
2023-10-01 15:37:20 +02:00
|
|
|
:param credentials_file: Path to the JSON credentials file for Google
|
|
|
|
pub/sub (default: ``~/.credentials/platypush/google/pubsub.json``)
|
2019-12-30 09:33:26 +01:00
|
|
|
"""
|
|
|
|
super().__init__(**kwargs)
|
|
|
|
self.credentials_file = credentials_file
|
|
|
|
self.project_id = self.get_project_id()
|
|
|
|
|
|
|
|
def get_project_id(self):
|
2023-09-24 16:54:43 +02:00
|
|
|
with open(self.credentials_file) as f:
|
|
|
|
return json.load(f).get('project_id')
|
2019-12-30 09:33:26 +01:00
|
|
|
|
|
|
|
def get_credentials(self, audience: str):
|
|
|
|
from google.auth import jwt
|
2023-09-24 16:54:43 +02:00
|
|
|
|
|
|
|
return jwt.Credentials.from_service_account_file(
|
|
|
|
self.credentials_file, audience=audience
|
|
|
|
)
|
2019-12-30 09:33:26 +01:00
|
|
|
|
|
|
|
@action
|
|
|
|
def send_message(self, topic: str, msg, **kwargs):
|
|
|
|
"""
|
|
|
|
Sends a message to a topic
|
|
|
|
|
2023-10-01 15:37:20 +02:00
|
|
|
:param topic: Topic/channel where the message will be delivered. You
|
|
|
|
can either specify the full topic name in the format
|
|
|
|
``projects/<project_id>/topics/<topic_name>``, where
|
|
|
|
``<project_id>`` must be the ID of your Google Pub/Sub project, or
|
|
|
|
just ``<topic_name>`` - in such case it's implied that you refer to
|
|
|
|
the ``topic_name`` under the ``project_id`` of your service
|
|
|
|
credentials.
|
2019-12-30 09:33:26 +01:00
|
|
|
:param msg: Message to be sent. It can be a list, a dict, or a Message object
|
|
|
|
:param kwargs: Extra arguments to be passed to .publish()
|
|
|
|
"""
|
|
|
|
from google.cloud import pubsub_v1
|
|
|
|
from google.api_core.exceptions import AlreadyExists
|
|
|
|
|
|
|
|
credentials = self.get_credentials(self.publisher_audience)
|
|
|
|
publisher = pubsub_v1.PublisherClient(credentials=credentials)
|
|
|
|
|
2023-10-01 15:37:20 +02:00
|
|
|
if not topic.startswith(f'projects/{self.project_id}/topics/'):
|
|
|
|
topic = f'projects/{self.project_id}/topics/{topic}'
|
2019-12-30 09:33:26 +01:00
|
|
|
|
|
|
|
try:
|
|
|
|
publisher.create_topic(topic)
|
|
|
|
except AlreadyExists:
|
|
|
|
pass
|
|
|
|
|
2023-09-24 16:54:43 +02:00
|
|
|
if isinstance(msg, (int, float)):
|
2019-12-30 09:33:26 +01:00
|
|
|
msg = str(msg)
|
2023-09-24 16:54:43 +02:00
|
|
|
if isinstance(msg, (dict, list)):
|
2019-12-30 09:33:26 +01:00
|
|
|
msg = json.dumps(msg)
|
|
|
|
if isinstance(msg, str):
|
|
|
|
msg = msg.encode()
|
|
|
|
|
|
|
|
publisher.publish(topic, msg, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
# vim:sw=4:ts=4:et:
|