Added Google Pub/Sub integration

This commit is contained in:
Fabio Manganiello 2019-12-30 09:33:26 +01:00
parent 8f2eb1c4e0
commit bc7c248f72
6 changed files with 222 additions and 19 deletions

View file

@ -53,7 +53,8 @@ class Backend(Thread, EventGenerator):
self.bus = bus or Bus() self.bus = bus or Bus()
self.device_id = Config.get('device_id') self.device_id = Config.get('device_id')
self.thread_id = None self.thread_id = None
self._stop = False self._should_stop = False
self._stop_event = threading.Event()
self._kwargs = kwargs self._kwargs = kwargs
self.logger = logging.getLogger(self.__class__.__name__) self.logger = logging.getLogger(self.__class__.__name__)
@ -95,12 +96,11 @@ class Backend(Thread, EventGenerator):
if isinstance(msg, StopEvent) and msg.targets_me(): if isinstance(msg, StopEvent) and msg.targets_me():
self.logger.info('Received STOP event on {}'.format(self.__class__.__name__)) self.logger.info('Received STOP event on {}'.format(self.__class__.__name__))
self._stop = True self._should_stop = True
else: else:
msg.backend = self # Augment message to be able to process responses msg.backend = self # Augment message to be able to process responses
self.bus.post(msg) self.bus.post(msg)
def _is_expected_response(self, msg): def _is_expected_response(self, msg):
""" Internal only - returns true if we are expecting for a response """ Internal only - returns true if we are expecting for a response
and msg is that response """ and msg is that response """
@ -109,12 +109,10 @@ class Backend(Thread, EventGenerator):
and isinstance(msg, Response) \ and isinstance(msg, Response) \
and msg.id == self._request_context['request'].id and msg.id == self._request_context['request'].id
def _get_backend_config(self): def _get_backend_config(self):
config_name = 'backend.' + self.__class__.__name__.split('Backend')[0].lower() config_name = 'backend.' + self.__class__.__name__.split('Backend')[0].lower()
return Config.get(config_name) return Config.get(config_name)
def _setup_response_handler(self, request, on_response, response_timeout): def _setup_response_handler(self, request, on_response, response_timeout):
def _timeout_hndl(): def _timeout_hndl():
raise RuntimeError('Timed out while waiting for a response from {}'. raise RuntimeError('Timed out while waiting for a response from {}'.
@ -135,12 +133,12 @@ class Backend(Thread, EventGenerator):
resp_backend.start() resp_backend.start()
def send_event(self, event, **kwargs): def send_event(self, event, **kwargs):
""" """
Send an event message on the backend. Send an event message on the backend.
:param event: Event to send. It can be a dict, a string/bytes UTF-8 JSON, or a platypush.message.event.Event object. :param event: Event to send. It can be a dict, a string/bytes UTF-8 JSON, or a platypush.message.event.Event
object.
""" """
event = Event.build(event) event = Event.build(event)
@ -152,18 +150,20 @@ class Backend(Thread, EventGenerator):
self.send_message(event, **kwargs) self.send_message(event, **kwargs)
def send_request(self, request, on_response=None, def send_request(self, request, on_response=None,
response_timeout=_default_response_timeout, **kwargs): response_timeout=_default_response_timeout, **kwargs):
""" """
Send a request message on the backend. Send a request message on the backend.
:param request: The request, either a dict, a string/bytes UTF-8 JSON, or a platypush.message.request.Request object. :param request: The request, either a dict, a string/bytes UTF-8 JSON, or a platypush.message.request.Request
object.
:param on_response: Optional callback that will be called when a response is received. If set, this method will synchronously wait for a response before exiting. :param on_response: Optional callback that will be called when a response is received. If set, this method will
synchronously wait for a response before exiting.
:type on_response: function :type on_response: function
:param response_timeout: If on_response is set, the backend will raise an exception if the response isn't received within this number of seconds (default: None) :param response_timeout: If on_response is set, the backend will raise an exception if the response isn't
received within this number of seconds (default: None)
:type response_timeout: float :type response_timeout: float
""" """
@ -177,12 +177,12 @@ class Backend(Thread, EventGenerator):
self.send_message(request, **kwargs) self.send_message(request, **kwargs)
def send_response(self, response, request, **kwargs): def send_response(self, response, request, **kwargs):
""" """
Send a response message on the backend. Send a response message on the backend.
:param response: The response, either a dict, a string/bytes UTF-8 JSON, or a platypush.message.response.Response object :param response: The response, either a dict, a string/bytes UTF-8 JSON, or a
:class:`platypush.message.response.Response` object.
:param request: Associated request, used to set the response parameters that will link them :param request: Associated request, used to set the response parameters that will link them
""" """
@ -191,7 +191,6 @@ class Backend(Thread, EventGenerator):
self.send_message(response, **kwargs) self.send_message(response, **kwargs)
def send_message(self, msg, queue_name=None, **kwargs): def send_message(self, msg, queue_name=None, **kwargs):
""" """
Sends a platypush.message.Message to a node. Sends a platypush.message.Message to a node.
@ -200,7 +199,8 @@ class Backend(Thread, EventGenerator):
other consumers through the configured Redis main queue. other consumers through the configured Redis main queue.
:param msg: The message to send :param msg: The message to send
:param queue_name: Send the message on a specific queue (default: the queue_name configured on the Redis backend) :param queue_name: Send the message on a specific queue (default: the queue_name configured on the Redis
backend)
""" """
try: try:
@ -214,7 +214,6 @@ class Backend(Thread, EventGenerator):
redis.send_message(msg, queue_name=queue_name) redis.send_message(msg, queue_name=queue_name)
def run(self): def run(self):
""" Starts the backend thread. To be implemented in the derived classes """ """ Starts the backend thread. To be implemented in the derived classes """
self.thread_id = threading.get_ident() self.thread_id = threading.get_ident()
@ -231,12 +230,16 @@ class Backend(Thread, EventGenerator):
thread_id=self.thread_id) thread_id=self.thread_id)
self.send_message(evt) self.send_message(evt)
self._stop_event.set()
self.on_stop() self.on_stop()
Thread(target=_async_stop).start() Thread(target=_async_stop).start()
def should_stop(self): def should_stop(self):
return self._stop return self._should_stop
def wait_stop(self, timeout=None):
self._stop_event.wait(timeout)
def _get_redis(self): def _get_redis(self):
import redis import redis

View file

@ -0,0 +1,93 @@
import json
from typing import Optional, List
from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.message.event.google.pubsub import GooglePubsubMessageEvent
from platypush.utils import set_thread_name
class GooglePubsubBackend(Backend):
"""
Subscribe to a list of topics on a Google Pub/Sub instance. See
:class:`platypush.plugins.google.pubsub.GooglePubsubPlugin` for a reference on how to generate your
project and credentials file.
Triggers:
* :class:`platypush.message.event.google.pubsub.GooglePubsubMessageEvent` when a new message is received on
a subscribed topic.
Requires:
* **google-cloud-pubsub** (``pip install google-cloud-pubsub``)
"""
def __init__(self, topics: List[str], credentials_file: Optional[str] = None, *args, **kwargs):
"""
:param topics: List of topics to subscribe. You can either specify the full topic name in the format
``projects/<project_id>/topics/<topic_name>``, where ``<project_id>`` must be the ID of your
Google Pub/Sub project, or just ``<topic_name>`` - in such case it's implied that you refer to the
``topic_name`` under the ``project_id`` of your service credentials.
:param credentials_file: Path to the Pub/Sub service credentials file (default: value configured on the
``google.pubsub`` plugin or ``~/.credentials/platypush/google/pubsub.json``).
"""
super().__init__(*args, **kwargs)
self.topics = topics
if credentials_file:
self.credentials_file = credentials_file
else:
plugin = self._get_plugin()
self.credentials_file = plugin.credentials_file
@staticmethod
def _get_plugin():
return get_plugin('google.pubsub')
def _message_callback(self, topic):
def callback(msg):
data = msg.data.decode()
# noinspection PyBroadException
try:
data = json.loads(data)
except:
pass
msg.ack()
self.bus.post(GooglePubsubMessageEvent(topic=topic, msg=data))
return callback
def run(self):
# noinspection PyPackageRequirements
from google.cloud import pubsub_v1
# noinspection PyPackageRequirements
from google.api_core.exceptions import AlreadyExists
super().run()
set_thread_name('GooglePubSub')
plugin = self._get_plugin()
project_id = plugin.get_project_id()
credentials = plugin.get_credentials(plugin.subscriber_audience)
subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
for topic in self.topics:
if not topic.startswith('projects/{}/topics/'.format(project_id)):
topic = 'projects/{}/topics/{}'.format(project_id, topic)
subscription_name = '/'.join([*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]])
try:
subscriber.create_subscription(name=subscription_name, topic=topic)
except AlreadyExists:
pass
subscriber.subscribe(subscription_name, self._message_callback(topic))
self.wait_stop()
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,13 @@
from platypush.message.event import Event
class GooglePubsubMessageEvent(Event):
"""
Event triggered when a new message is received on a subscribed Google Pub/Sub topic.
"""
def __init__(self, topic: str, msg, *args, **kwargs):
super().__init__(*args, topic=topic, msg=msg, *args, **kwargs)
# vim:sw=4:ts=4:et:

View file

@ -0,0 +1,89 @@
import json
import os
from platypush.plugins import Plugin, action
class GooglePubsubPlugin(Plugin):
"""
Send messages over a Google pub/sub instance.
You'll need a Google Cloud active project and a set of credentials to use this plugin:
1. Create a project on the `Google Cloud console <https://console.cloud.google.com/projectcreate>`_ if
you don't have one already.
2. In the `Google Cloud API console <https://console.cloud.google.com/apis/credentials/serviceaccountkey>`_
create a new service account key. Select "New Service Account", choose the role "Pub/Sub Editor" and leave
the key type as JSON.
3. Download the JSON service credentials file. By default platypush will look for the credentials file under
~/.credentials/platypush/google/pubsub.json.
Requires:
* **google-cloud-pubsub** (``pip install google-cloud-pubsub``)
"""
publisher_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher'
subscriber_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber'
default_credentials_file = os.path.join(os.path.expanduser('~'),
'.credentials', 'platypush', 'google', 'pubsub.json')
def __init__(self, credentials_file: str = default_credentials_file, **kwargs):
"""
:param credentials_file: Path to the JSON credentials file for Google pub/sub (default:
~/.credentials/platypush/google/pubsub.json)
"""
super().__init__(**kwargs)
self.credentials_file = credentials_file
self.project_id = self.get_project_id()
def get_project_id(self):
credentials = json.load(open(self.credentials_file))
return credentials.get('project_id')
def get_credentials(self, audience: str):
# noinspection PyPackageRequirements
from google.auth import jwt
return jwt.Credentials.from_service_account_file(self.credentials_file, audience=audience)
@action
def send_message(self, topic: str, msg, **kwargs):
"""
Sends a message to a topic
:param topic: Topic/channel where the message will be delivered. You can either specify the full topic name in
the format ``projects/<project_id>/topics/<topic_name>``, where ``<project_id>`` must be the ID of your
Google Pub/Sub project, or just ``<topic_name>`` - in such case it's implied that you refer to the
``topic_name`` under the ``project_id`` of your service credentials.
:param msg: Message to be sent. It can be a list, a dict, or a Message object
:param kwargs: Extra arguments to be passed to .publish()
"""
# noinspection PyPackageRequirements
from google.cloud import pubsub_v1
# noinspection PyPackageRequirements
from google.api_core.exceptions import AlreadyExists
credentials = self.get_credentials(self.publisher_audience)
publisher = pubsub_v1.PublisherClient(credentials=credentials)
if not topic.startswith('projects/{}/topics/'.format(self.project_id)):
topic = 'projects/{}/topics/{}'.format(self.project_id, topic)
try:
publisher.create_topic(topic)
except AlreadyExists:
pass
if isinstance(msg, int) or isinstance(msg, float):
msg = str(msg)
if isinstance(msg, dict) or isinstance(msg, list):
msg = json.dumps(msg)
if isinstance(msg, str):
msg = msg.encode()
publisher.publish(topic, msg, **kwargs)
# vim:sw=4:ts=4:et:

View file

@ -195,3 +195,6 @@ croniter
# Support for Trello integration # Support for Trello integration
# py-trello # py-trello
# Support for Google Pub/Sub
# google-cloud-pubsub

View file

@ -256,6 +256,8 @@ setup(
'todoist': ['todoist-python'], 'todoist': ['todoist-python'],
# Support for Trello integration # Support for Trello integration
'trello': ['py-trello'], 'trello': ['py-trello'],
# Support for Google Pub/Sub
'google-pubsub': ['google-cloud-pubsub'],
}, },
) )