Compare commits

...

5 commits

Author SHA1 Message Date
d82a5ecb1e
[#356] Merged adafruit.io plugin and backend.
All checks were successful
continuous-integration/drone/push Build is passing
2024-01-19 21:55:20 +01:00
01e8296406 Merge pull request 'Merged google.pubsub plugin and backend.' (#355) from 351-migrate-google-pubsub into master
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #355
2024-01-19 03:14:32 +01:00
89c5283ee9 Merge branch 'master' into 351-migrate-google-pubsub
All checks were successful
continuous-integration/drone/push Build is passing
2024-01-19 03:13:09 +01:00
8a84a36905
[#351] Merged google.pubsub plugin and backend.
All checks were successful
continuous-integration/drone/push Build is passing
Closes: #351
2024-01-19 03:11:55 +01:00
2e9cb44caf
[Automatic] Updated components cache
All checks were successful
continuous-integration/drone/push Build is passing
2024-01-19 00:50:24 +00:00
16 changed files with 361 additions and 380 deletions

View file

@ -6,11 +6,9 @@ Backends
:maxdepth: 1 :maxdepth: 1
:caption: Backends: :caption: Backends:
platypush/backend/adafruit.io.rst
platypush/backend/button.flic.rst platypush/backend/button.flic.rst
platypush/backend/camera.pi.rst platypush/backend/camera.pi.rst
platypush/backend/chat.telegram.rst platypush/backend/chat.telegram.rst
platypush/backend/google.pubsub.rst
platypush/backend/gps.rst platypush/backend/gps.rst
platypush/backend/http.rst platypush/backend/http.rst
platypush/backend/mail.rst platypush/backend/mail.rst

View file

@ -1,6 +0,0 @@
``adafruit.io``
=================================
.. automodule:: platypush.backend.adafruit.io
:members:

View file

@ -1,5 +0,0 @@
``google.pubsub``
===================================
.. automodule:: platypush.backend.google.pubsub
:members:

View file

@ -1,97 +0,0 @@
from typing import Optional
from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.message.event.adafruit import (
ConnectedEvent,
DisconnectedEvent,
FeedUpdateEvent,
)
class AdafruitIoBackend(Backend):
"""
Backend that listens to messages received over the Adafruit IO message queue
Requires:
* The :class:`platypush.plugins.adafruit.io.AdafruitIoPlugin` plugin to
be active and configured.
"""
def __init__(self, feeds, *args, **kwargs):
"""
:param feeds: List of feed IDs to monitor
:type feeds: list[str]
"""
super().__init__(*args, **kwargs)
from Adafruit_IO import MQTTClient
self.feeds = feeds
self._client: Optional[MQTTClient] = None
def _init_client(self):
if self._client:
return
from Adafruit_IO import MQTTClient
plugin = get_plugin('adafruit.io')
if not plugin:
raise RuntimeError('Adafruit IO plugin not configured')
# noinspection PyProtectedMember
self._client = MQTTClient(plugin._username, plugin._key)
self._client.on_connect = self.on_connect()
self._client.on_disconnect = self.on_disconnect()
self._client.on_message = self.on_message()
def on_connect(self):
def _handler(client):
for feed in self.feeds:
client.subscribe(feed)
self.bus.post(ConnectedEvent())
return _handler
def on_disconnect(self):
def _handler(*_, **__):
self.bus.post(DisconnectedEvent())
return _handler
def on_message(self, *_, **__):
# noinspection PyUnusedLocal
def _handler(client, feed, data):
try:
data = float(data)
except Exception as e:
self.logger.debug('Not a number: {}: {}'.format(data, e))
self.bus.post(FeedUpdateEvent(feed=feed, data=data))
return _handler
def run(self):
super().run()
self.logger.info(
('Initialized Adafruit IO backend, listening on ' + 'feeds {}').format(
self.feeds
)
)
while not self.should_stop():
try:
self._init_client()
# noinspection PyUnresolvedReferences
self._client.connect()
# noinspection PyUnresolvedReferences
self._client.loop_blocking()
except Exception as e:
self.logger.exception(e)
self._client = None
# vim:sw=4:ts=4:et:

View file

@ -1,12 +0,0 @@
manifest:
events:
platypush.message.event.adafruit.ConnectedEvent: when thebackend connects to the
Adafruit queue
platypush.message.event.adafruit.DisconnectedEvent: when thebackend disconnects
from the Adafruit queue
platypush.message.event.adafruit.FeedUpdateEvent: when anupdate event is received
on a monitored feed
install:
pip: []
package: platypush.backend.adafruit.io
type: backend

View file

@ -1,88 +0,0 @@
import json
from typing import Optional, List
from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.message.event.google.pubsub import GooglePubsubMessageEvent
class GooglePubsubBackend(Backend):
"""
Subscribe to a list of topics on a Google Pub/Sub instance. See
:class:`platypush.plugins.google.pubsub.GooglePubsubPlugin` for a reference on how to generate your
project and credentials file.
"""
def __init__(
self, topics: List[str], credentials_file: Optional[str] = None, *args, **kwargs
):
"""
:param topics: List of topics to subscribe. You can either specify the full topic name in the format
``projects/<project_id>/topics/<topic_name>``, where ``<project_id>`` must be the ID of your
Google Pub/Sub project, or just ``<topic_name>`` - in such case it's implied that you refer to the
``topic_name`` under the ``project_id`` of your service credentials.
:param credentials_file: Path to the Pub/Sub service credentials file (default: value configured on the
``google.pubsub`` plugin or ``~/.credentials/platypush/google/pubsub.json``).
"""
super().__init__(*args, name='GooglePubSub', **kwargs)
self.topics = topics
if credentials_file:
self.credentials_file = credentials_file
else:
plugin = self._get_plugin()
self.credentials_file = plugin.credentials_file
@staticmethod
def _get_plugin():
plugin = get_plugin('google.pubsub')
assert plugin, 'google.pubsub plugin not enabled'
return plugin
def _message_callback(self, topic):
def callback(msg):
data = msg.data.decode()
try:
data = json.loads(data)
except Exception as e:
self.logger.debug('Not a valid JSON: %s: %s', data, e)
msg.ack()
self.bus.post(GooglePubsubMessageEvent(topic=topic, msg=data))
return callback
def run(self):
# noinspection PyPackageRequirements
from google.cloud import pubsub_v1
# noinspection PyPackageRequirements
from google.api_core.exceptions import AlreadyExists
super().run()
plugin = self._get_plugin()
project_id = plugin.get_project_id()
credentials = plugin.get_credentials(plugin.subscriber_audience)
subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
for topic in self.topics:
prefix = f'projects/{project_id}/topics/'
if not topic.startswith(prefix):
topic = f'{prefix}{topic}'
subscription_name = '/'.join(
[*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
)
try:
subscriber.create_subscription(name=subscription_name, topic=topic)
except AlreadyExists:
pass
subscriber.subscribe(subscription_name, self._message_callback(topic))
self.wait_stop()
# vim:sw=4:ts=4:et:

View file

@ -1,9 +0,0 @@
manifest:
events:
platypush.message.event.google.pubsub.GooglePubsubMessageEvent: when a new message
is received ona subscribed topic.
install:
pip:
- google-cloud-pubsub
package: platypush.backend.google.pubsub
type: backend

Binary file not shown.

View file

@ -1,27 +1,28 @@
from platypush.message.event import Event from platypush.message.event import Event
class ConnectedEvent(Event): class AdafruitConnectedEvent(Event):
""" """
Event triggered when the backend connects to the Adafruit message queue Event triggered when the backend connects to the Adafruit message queue.
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
class DisconnectedEvent(Event): class AdafruitDisconnectedEvent(Event):
""" """
Event triggered when the backend disconnects from the Adafruit message queue Event triggered when the backend disconnects from the Adafruit message
queue.
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
class FeedUpdateEvent(Event): class AdafruitFeedUpdateEvent(Event):
""" """
Event triggered upon Adafruit IO feed update Event triggered when a message is received on a subscribed Adafruit feed.
""" """
def __init__(self, feed, data, *args, **kwargs): def __init__(self, feed, data, *args, **kwargs):

View file

@ -1,26 +1,30 @@
import ast
import statistics import statistics
import json
import time import time
from threading import Thread, Lock from queue import Empty, Queue
from threading import Thread
from typing import Iterable, Optional, Union
from platypush.context import get_backend from platypush.message.event.adafruit import (
from platypush.plugins import Plugin, action AdafruitConnectedEvent,
AdafruitDisconnectedEvent,
data_throttler_lock = None AdafruitFeedUpdateEvent,
)
from platypush.plugins import RunnablePlugin, action
class AdafruitIoPlugin(Plugin): class AdafruitIoPlugin(RunnablePlugin):
""" """
This plugin allows you to interact with the Adafruit IO This plugin allows you to interact with `Adafruit IO
<https://io.adafruit.com>, a cloud-based message queue and storage. <https://io.adafruit.com>`_, a cloud-based message queue and storage. You
You can send values to feeds on your Adafruit IO account and read the can use this plugin to send and receive data to topics connected to your
values of those feeds as well through any device. Adafruit IO account.
Some example usages:: Some example usages:
# Send the temperature value for a connected sensor to the "temperature" feed .. code-block:: javascript
// Send the temperature value for a connected sensor to the "temperature" feed
{ {
"type": "request", "type": "request",
"action": "adafruit.io.send", "action": "adafruit.io.send",
@ -30,7 +34,7 @@ class AdafruitIoPlugin(Plugin):
} }
} }
# Receive the most recent temperature value // Receive the most recent temperature value
{ {
"type": "request", "type": "request",
"action": "adafruit.io.receive", "action": "adafruit.io.receive",
@ -38,164 +42,194 @@ class AdafruitIoPlugin(Plugin):
"feed": "temperature" "feed": "temperature"
} }
} }
""" """
_DATA_THROTTLER_QUEUE = 'platypush/adafruit.io' def __init__(
self,
def __init__(self, username, key, throttle_seconds=None, **kwargs): username: str,
key: str,
feeds: Iterable[str] = (),
throttle_interval: Optional[float] = None,
**kwargs
):
""" """
:param username: Your Adafruit username :param username: Your Adafruit username
:type username: str
:param key: Your Adafruit IO key :param key: Your Adafruit IO key
:type key: str :param feeds: List of feeds to subscribe to. If not set, then the plugin
will not subscribe to any feed unless instructed to do so, neither
:param throttle_seconds: If set, then instead of sending the values directly over ``send`` the plugin will it will emit any event.
first collect all the samples within the specified period and then dispatch them to Adafruit IO. :param throttle_interval: If set, then instead of sending the values
You may want to set it if you have data sources providing a lot of data points and you don't want to hit directly over ``send`` the plugin will first collect all the
the throttling limitations of Adafruit. samples within the specified period and then dispatch them to
:type throttle_seconds: float Adafruit IO. You may want to set it if you have data sources
providing a lot of data points and you don't want to hit the
throttling limitations of Adafruit.
""" """
from Adafruit_IO import Client from Adafruit_IO import Client, MQTTClient
global data_throttler_lock
super().__init__(**kwargs) super().__init__(**kwargs)
self._username = username self._username = username
self._key = key self._key = key
self.feeds = feeds
self.aio = Client(username=username, key=key) self.aio = Client(username=username, key=key)
self.throttle_seconds = throttle_seconds self._mqtt_client: Optional[MQTTClient] = None
self.throttle_interval = throttle_interval
self._data_throttler_thread: Optional[Thread] = None
self._data_throttler_queue = Queue()
if not data_throttler_lock: @property
data_throttler_lock = Lock() def _mqtt(self):
if not self._mqtt_client:
from Adafruit_IO import MQTTClient
if self.throttle_seconds and not data_throttler_lock.locked(): self._mqtt_client = MQTTClient(self._username, self._key)
self._get_redis() self._mqtt_client.on_connect = self._on_connect # type: ignore
self.logger.info('Starting Adafruit IO throttler thread') self._mqtt_client.on_disconnect = self._on_disconnect # type: ignore
data_throttler_lock.acquire(False) self._mqtt_client.on_message = self._on_message # type: ignore
self.data_throttler = Thread(target=self._data_throttler())
self.data_throttler.start()
@staticmethod return self._mqtt_client
def _get_redis():
from redis import Redis
redis_args = { def _on_connect(self, *_, **__):
'host': 'localhost', assert self._mqtt_client, 'MQTT client not initialized'
} for feed in self.feeds:
self._mqtt_client.subscribe(feed)
self._bus.post(AdafruitConnectedEvent())
redis_backend = get_backend('redis') def _on_disconnect(self, *_, **__):
if redis_backend: self._bus.post(AdafruitDisconnectedEvent())
redis_args = get_backend('redis').redis_args
redis_args['socket_timeout'] = 1 def _on_message(self, _, feed, data, *__):
return Redis(**redis_args) try:
data = float(data)
except (TypeError, ValueError):
pass
self._bus.post(AdafruitFeedUpdateEvent(feed=feed, data=data))
def _subscribe(self, feed: str):
assert self._mqtt_client, 'MQTT client not initialized'
self._mqtt_client.subscribe(feed)
def _unsubscribe(self, feed: str):
assert self._mqtt_client, 'MQTT client not initialized'
self._mqtt_client.unsubscribe(feed)
def _data_throttler(self): def _data_throttler(self):
from redis.exceptions import TimeoutError as QueueTimeoutError from Adafruit_IO import ThrottlingError
def run(): if not self.throttle_interval:
from Adafruit_IO import ThrottlingError return
redis = self._get_redis() last_processed_batch_timestamp = None
last_processed_batch_timestamp = None data = {}
data = {}
try: try:
while True: while not self.should_stop():
try: try:
new_data = ast.literal_eval( new_data = self._data_throttler_queue.get(timeout=1)
redis.blpop(self._DATA_THROTTLER_QUEUE)[1].decode('utf-8') for key, value in new_data.items():
) data.setdefault(key, []).append(value)
except Empty:
continue
for (key, value) in new_data.items(): should_process = data and (
data.setdefault(key, []).append(value) last_processed_batch_timestamp is None
except QueueTimeoutError: or time.time() - last_processed_batch_timestamp
pass >= self.throttle_interval
)
if data and ( if not should_process:
last_processed_batch_timestamp is None continue
or time.time() - last_processed_batch_timestamp
>= self.throttle_seconds
):
last_processed_batch_timestamp = time.time()
self.logger.info('Processing feeds batch for Adafruit IO')
for (feed, values) in data.items(): last_processed_batch_timestamp = time.time()
if values: self.logger.info('Processing feeds batch for Adafruit IO')
value = statistics.mean(values)
try: for feed, values in data.items():
self.send(feed, value, enqueue=False) if values:
except ThrottlingError: value = statistics.mean(values)
self.logger.warning(
'Adafruit IO throttling threshold hit, taking a nap '
+ 'before retrying'
)
time.sleep(self.throttle_seconds)
data = {} try:
except Exception as e: self.send(feed, value, enqueue=False)
self.logger.exception(e) except ThrottlingError:
self.logger.warning(
'Adafruit IO throttling threshold hit, taking a nap '
+ 'before retrying'
)
self.wait_stop(self.throttle_interval)
return run data = {}
except Exception as e:
self.logger.exception(e)
@action @action
def send(self, feed, value, enqueue=True): def send(self, feed: str, value: Union[int, float, str], enqueue: bool = True):
""" """
Send a value to an Adafruit IO feed Send a value to an Adafruit IO feed.
:param feed: Feed name :param feed: Feed name.
:type feed: str :param value: Value to send.
:param enqueue: If throttle_interval is set, this method by default will append values to the throttling queue
:param value: Value to send
:type value: Numeric or string
:param enqueue: If throttle_seconds is set, this method by default will append values to the throttling queue
to be periodically flushed instead of sending the message directly. In such case, pass enqueue=False to to be periodically flushed instead of sending the message directly. In such case, pass enqueue=False to
override the behaviour and send the message directly instead. override the behaviour and send the message directly instead.
:type enqueue: bool
""" """
if not self.throttle_seconds or not enqueue: if not self.throttle_interval or not enqueue:
# If no throttling is configured, or enqueue is false then send the value directly to Adafruit # If no throttling is configured, or enqueue is false then send the value directly to Adafruit
self.aio.send(feed, value) self.aio.send(feed, value)
else: else:
# Otherwise send it to the Redis queue to be picked up by the throttler thread # Otherwise send it to the queue to be picked up by the throttler thread
redis = self._get_redis() self._data_throttler_queue.put({feed: value})
redis.rpush(self._DATA_THROTTLER_QUEUE, json.dumps({feed: value}))
@action @action
def send_location_data(self, feed, lat, lon, ele, value): def send_location_data(
self,
feed: str,
latitude: float,
longitude: float,
elevation: float,
value: Optional[Union[int, float, str]] = None,
):
""" """
Send location data to an Adafruit IO feed Send location data to an Adafruit IO feed
:param feed: Feed name :param feed: Feed name
:type feed: str
:param lat: Latitude :param lat: Latitude
:type lat: float
:param lon: Longitude :param lon: Longitude
:type lon: float
:param ele: Elevation :param ele: Elevation
:type ele: float :param value: Extra value to attach to the record
:param value: Value to send
:type value: Numeric or string
""" """
self.aio.send_data( self.aio.send_data(
feed=feed, feed=feed,
value=value, value=value,
metadata={ metadata={
'lat': lat, 'lat': latitude,
'lon': lon, 'lon': longitude,
'ele': ele, 'ele': elevation,
}, },
) )
@action
def subscribe(self, feed: str):
"""
Subscribe to a feed.
:param feed: Feed name
"""
self._subscribe(feed)
@action
def unsubscribe(self, feed: str):
"""
Unsubscribe from a feed.
:param feed: Feed name
"""
self._unsubscribe(feed)
@classmethod @classmethod
def _cast_value(cls, value): def _cast_value(cls, value):
try: try:
@ -220,17 +254,14 @@ class AdafruitIoPlugin(Plugin):
] ]
@action @action
def receive(self, feed, limit=1): def receive(self, feed: str, limit: int = 1):
""" """
Receive data from the specified Adafruit IO feed Receive data from the specified Adafruit IO feed
:param feed: Feed name :param feed: Feed name
:type feed: str
:param limit: Maximum number of data points to be returned. If None, :param limit: Maximum number of data points to be returned. If None,
all the values in the feed will be returned. Default: 1 (return most all the values in the feed will be returned. Default: 1 (return most
recent value) recent value)
:type limit: int
""" """
if limit == 1: if limit == 1:
@ -241,42 +272,84 @@ class AdafruitIoPlugin(Plugin):
return values[:limit] if limit else values return values[:limit] if limit else values
@action @action
def receive_next(self, feed): def receive_next(self, feed: str):
""" """
Receive the next unprocessed data point from a feed Receive the next unprocessed data point from a feed
:param feed: Feed name :param feed: Feed name
:type feed: str
""" """
values = self._convert_data_to_dict(self.aio.receive_next(feed)) values = self._convert_data_to_dict(self.aio.receive_next(feed))
return values[0] if values else None return values[0] if values else None
@action @action
def receive_previous(self, feed): def receive_previous(self, feed: str):
""" """
Receive the last processed data point from a feed Receive the last processed data point from a feed
:param feed: Feed name :param feed: Feed name
:type feed: str
""" """
values = self._convert_data_to_dict(self.aio.receive_previous(feed)) values = self._convert_data_to_dict(self.aio.receive_previous(feed))
return values[0] if values else None return values[0] if values else None
@action @action
def delete(self, feed, data_id): def delete(self, feed: str, data_id: str):
""" """
Delete a data point from a feed Delete a data point from a feed
:param feed: Feed name :param feed: Feed name
:type feed: str
:param data_id: Data point ID to remove :param data_id: Data point ID to remove
:type data_id: str
""" """
self.aio.delete(feed, data_id) self.aio.delete(feed, data_id)
def main(self):
if self.throttle_interval:
self._data_throttler_thread = Thread(target=self._data_throttler)
self._data_throttler_thread.start()
try:
while not self.should_stop():
cur_wait = 1
max_wait = 60
try:
self._mqtt.connect()
cur_wait = 1
self._mqtt.loop_blocking()
except Exception as e:
self.logger.warning(
'Adafruit IO connection error: %s, retrying in %d seconds',
e,
cur_wait,
)
self.logger.exception(e)
self.wait_stop(cur_wait)
cur_wait = min(cur_wait * 2, max_wait)
finally:
self._stop_mqtt()
finally:
self._stop_data_throttler()
def _stop_mqtt(self):
if self._mqtt_client:
self._mqtt_client.disconnect()
self._mqtt_client = None
def _stop_data_throttler(self):
if self._data_throttler_thread:
self._data_throttler_thread.join()
self._data_throttler_thread = None
def _stop(self):
self._stop_mqtt()
self._stop_data_throttler()
def stop(self):
self._stop()
super().stop()
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,5 +1,8 @@
manifest: manifest:
events: {} events:
- platypush.message.event.adafruit.AdafruitConnectedEvent
- platypush.message.event.adafruit.AdafruitDisconnectedEvent
- platypush.message.event.adafruit.AdafruitFeedUpdateEvent
install: install:
pip: pip:
- adafruit-io - adafruit-io

View file

@ -1,59 +1,87 @@
import json import json
import os import os
from threading import RLock
from typing import Iterable, Optional
from platypush.plugins import Plugin, action from google.auth import jwt
from google.cloud import pubsub_v1 as pubsub # pylint: disable=no-name-in-module
from google.api_core.exceptions import AlreadyExists, NotFound
from platypush.config import Config
from platypush.message.event.google.pubsub import GooglePubsubMessageEvent
from platypush.plugins import RunnablePlugin, action
class GooglePubsubPlugin(Plugin): class GooglePubsubPlugin(RunnablePlugin):
""" """
Send messages over a Google pub/sub instance. Publishes and subscribes to Google Pub/Sub topics.
You'll need a Google Cloud active project and a set of credentials to use this plugin:
You'll need a Google Cloud active project and a set of credentials to use
this plugin:
1. Create a project on the `Google Cloud console 1. Create a project on the `Google Cloud console
<https://console.cloud.google.com/projectcreate>`_ if you don't have <https://console.cloud.google.com/projectcreate>`_ if you don't have
one already. one already.
2. In the `Google Cloud API console 2. In the `Google Cloud API console
<https://console.cloud.google.com/apis/credentials/serviceaccountkey>`_ <https://console.cloud.google.com/apis/credentials/serviceaccountkey>`_
create a new service account key. Select "New Service Account", choose create a new service account key. Select "New Service Account", choose
the role "Pub/Sub Editor" and leave the key type as JSON. the role "Pub/Sub Editor" and leave the key type as JSON.
3. Download the JSON service credentials file. By default Platypush 3. Download the JSON service credentials file. By default Platypush
will look for the credentials file under will look for the credentials file under
``~/.credentials/platypush/google/pubsub.json``. ``<WORKDIR>/credentials/google/pubsub.json``.
""" """
publisher_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher' publisher_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher'
subscriber_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber' subscriber_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber'
default_credentials_file = os.path.join( default_credentials_file = os.path.join(
os.path.expanduser('~'), '.credentials', 'platypush', 'google', 'pubsub.json' Config.get_workdir(), 'credentials', 'google', 'pubsub.json'
) )
def __init__(self, credentials_file: str = default_credentials_file, **kwargs): def __init__(
self,
credentials_file: str = default_credentials_file,
topics: Iterable[str] = (),
**kwargs,
):
""" """
:param credentials_file: Path to the JSON credentials file for Google :param credentials_file: Path to the JSON credentials file for Google
pub/sub (default: ``~/.credentials/platypush/google/pubsub.json``) pub/sub (default: ``~/.credentials/platypush/google/pubsub.json``)
:param topics: List of topics to subscribe. You can either specify the
full topic name in the format
``projects/<project_id>/topics/<topic_name>``, where
``<project_id>`` must be the ID of your Google Pub/Sub project, or
just ``<topic_name>`` - in such case it's implied that you refer to
the ``topic_name`` under the ``project_id`` of your service
credentials.
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
self.credentials_file = credentials_file self.credentials_file = credentials_file
self.project_id = self.get_project_id() self.project_id = self.get_project_id()
self.topics = topics
self._subscriber: Optional[pubsub.SubscriberClient] = None
self._subscriber_lock = RLock()
def get_project_id(self): def get_project_id(self):
with open(self.credentials_file) as f: with open(self.credentials_file) as f:
return json.load(f).get('project_id') return json.load(f).get('project_id')
def get_credentials(self, audience: str): def get_credentials(self, audience: str):
from google.auth import jwt
return jwt.Credentials.from_service_account_file( return jwt.Credentials.from_service_account_file(
self.credentials_file, audience=audience self.credentials_file, audience=audience
) )
def _norm_topic(self, topic: str):
if not topic.startswith(f'projects/{self.project_id}/topics/'):
topic = f'projects/{self.project_id}/topics/{topic}'
return topic
@action @action
def send_message(self, topic: str, msg, **kwargs): def publish(self, topic: str, msg, **kwargs):
""" """
Sends a message to a topic Publish a message to a topic
:param topic: Topic/channel where the message will be delivered. You :param topic: Topic/channel where the message will be delivered. You
can either specify the full topic name in the format can either specify the full topic name in the format
@ -65,19 +93,14 @@ class GooglePubsubPlugin(Plugin):
:param msg: Message to be sent. It can be a list, a dict, or a Message object :param msg: Message to be sent. It can be a list, a dict, or a Message object
:param kwargs: Extra arguments to be passed to .publish() :param kwargs: Extra arguments to be passed to .publish()
""" """
from google.cloud import pubsub_v1
from google.api_core.exceptions import AlreadyExists
credentials = self.get_credentials(self.publisher_audience) credentials = self.get_credentials(self.publisher_audience)
publisher = pubsub_v1.PublisherClient(credentials=credentials) publisher = pubsub.PublisherClient(credentials=credentials)
topic = self._norm_topic(topic)
if not topic.startswith(f'projects/{self.project_id}/topics/'):
topic = f'projects/{self.project_id}/topics/{topic}'
try: try:
publisher.create_topic(topic) publisher.create_topic(name=topic)
except AlreadyExists: except AlreadyExists:
pass self.logger.debug('Topic %s already exists', topic)
if isinstance(msg, (int, float)): if isinstance(msg, (int, float)):
msg = str(msg) msg = str(msg)
@ -88,5 +111,101 @@ class GooglePubsubPlugin(Plugin):
publisher.publish(topic, msg, **kwargs) publisher.publish(topic, msg, **kwargs)
@action
def send_message(self, topic: str, msg, **kwargs):
"""
Alias for :meth:`.publish`
"""
self.publish(topic=topic, msg=msg, **kwargs)
@action
def subscribe(self, topic: str):
"""
Subscribe to a topic.
:param topic: Topic/channel where the message will be delivered. You
can either specify the full topic name in the format
``projects/<project_id>/topics/<topic_name>``, where
``<project_id>`` must be the ID of your Google Pub/Sub project, or
just ``<topic_name>`` - in such case it's implied that you refer to
the ``topic_name`` under the ``project_id`` of your service
credentials.
"""
assert self._subscriber, 'Subscriber not initialized'
topic = self._norm_topic(topic)
subscription_name = '/'.join(
[*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
)
try:
self._subscriber.create_subscription(name=subscription_name, topic=topic)
except AlreadyExists:
self.logger.debug('Subscription %s already exists', subscription_name)
self._subscriber.subscribe(subscription_name, self._message_callback(topic))
@action
def unsubscribe(self, topic: str):
"""
Unsubscribe from a topic.
:param topic: Topic/channel where the message will be delivered. You
can either specify the full topic name in the format
``projects/<project_id>/topics/<topic_name>``, where
``<project_id>`` must be the ID of your Google Pub/Sub project, or
just ``<topic_name>`` - in such case it's implied that you refer to
the ``topic_name`` under the ``project_id`` of your service
credentials.
"""
assert self._subscriber, 'Subscriber not initialized'
topic = self._norm_topic(topic)
subscription_name = '/'.join(
[*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
)
try:
self._subscriber.delete_subscription(subscription=subscription_name)
except NotFound:
self.logger.debug('Subscription %s not found', subscription_name)
def _message_callback(self, topic):
def callback(msg):
data = msg.data.decode()
try:
data = json.loads(data)
except Exception as e:
self.logger.debug('Not a valid JSON: %s: %s', data, e)
msg.ack()
self._bus.post(GooglePubsubMessageEvent(topic=topic, msg=data))
return callback
def main(self):
credentials = self.get_credentials(self.subscriber_audience)
with self._subscriber_lock:
self._subscriber = pubsub.SubscriberClient(credentials=credentials)
for topic in self.topics:
self.subscribe(topic=topic)
self.wait_stop()
with self._subscriber_lock:
self._close()
def _close(self):
with self._subscriber_lock:
if self._subscriber:
try:
self._subscriber.close()
except Exception as e:
self.logger.debug('Error while closing the subscriber: %s', e)
self._subscriber = None
def stop(self):
self._close()
super().stop()
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,5 +1,6 @@
manifest: manifest:
events: {} events:
- platypush.message.event.google.pubsub.GooglePubsubMessageEvent
install: install:
apk: apk:
- py3-google-api-python-client - py3-google-api-python-client

View file

@ -35,10 +35,13 @@ mock_imports = [
"gi", "gi",
"gi.repository", "gi.repository",
"google", "google",
"google.api_core",
"google.auth",
"google.assistant.embedded", "google.assistant.embedded",
"google.assistant.library", "google.assistant.library",
"google.assistant.library.event", "google.assistant.library.event",
"google.assistant.library.file_helpers", "google.assistant.library.file_helpers",
"google.cloud",
"google.oauth2.credentials", "google.oauth2.credentials",
"googlesamples", "googlesamples",
"googlesamples.assistant.grpc.audio_helpers", "googlesamples.assistant.grpc.audio_helpers",