Compare commits
5 commits
1323156838
...
d82a5ecb1e
Author | SHA1 | Date | |
---|---|---|---|
d82a5ecb1e | |||
01e8296406 | |||
89c5283ee9 | |||
8a84a36905 | |||
2e9cb44caf |
16 changed files with 361 additions and 380 deletions
|
@ -6,11 +6,9 @@ Backends
|
||||||
:maxdepth: 1
|
:maxdepth: 1
|
||||||
:caption: Backends:
|
:caption: Backends:
|
||||||
|
|
||||||
platypush/backend/adafruit.io.rst
|
|
||||||
platypush/backend/button.flic.rst
|
platypush/backend/button.flic.rst
|
||||||
platypush/backend/camera.pi.rst
|
platypush/backend/camera.pi.rst
|
||||||
platypush/backend/chat.telegram.rst
|
platypush/backend/chat.telegram.rst
|
||||||
platypush/backend/google.pubsub.rst
|
|
||||||
platypush/backend/gps.rst
|
platypush/backend/gps.rst
|
||||||
platypush/backend/http.rst
|
platypush/backend/http.rst
|
||||||
platypush/backend/mail.rst
|
platypush/backend/mail.rst
|
||||||
|
|
|
@ -1,6 +0,0 @@
|
||||||
``adafruit.io``
|
|
||||||
=================================
|
|
||||||
|
|
||||||
.. automodule:: platypush.backend.adafruit.io
|
|
||||||
:members:
|
|
||||||
|
|
|
@ -1,5 +0,0 @@
|
||||||
``google.pubsub``
|
|
||||||
===================================
|
|
||||||
|
|
||||||
.. automodule:: platypush.backend.google.pubsub
|
|
||||||
:members:
|
|
|
@ -1,97 +0,0 @@
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from platypush.backend import Backend
|
|
||||||
from platypush.context import get_plugin
|
|
||||||
from platypush.message.event.adafruit import (
|
|
||||||
ConnectedEvent,
|
|
||||||
DisconnectedEvent,
|
|
||||||
FeedUpdateEvent,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class AdafruitIoBackend(Backend):
|
|
||||||
"""
|
|
||||||
Backend that listens to messages received over the Adafruit IO message queue
|
|
||||||
|
|
||||||
Requires:
|
|
||||||
|
|
||||||
* The :class:`platypush.plugins.adafruit.io.AdafruitIoPlugin` plugin to
|
|
||||||
be active and configured.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, feeds, *args, **kwargs):
|
|
||||||
"""
|
|
||||||
:param feeds: List of feed IDs to monitor
|
|
||||||
:type feeds: list[str]
|
|
||||||
"""
|
|
||||||
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
from Adafruit_IO import MQTTClient
|
|
||||||
|
|
||||||
self.feeds = feeds
|
|
||||||
self._client: Optional[MQTTClient] = None
|
|
||||||
|
|
||||||
def _init_client(self):
|
|
||||||
if self._client:
|
|
||||||
return
|
|
||||||
|
|
||||||
from Adafruit_IO import MQTTClient
|
|
||||||
|
|
||||||
plugin = get_plugin('adafruit.io')
|
|
||||||
if not plugin:
|
|
||||||
raise RuntimeError('Adafruit IO plugin not configured')
|
|
||||||
|
|
||||||
# noinspection PyProtectedMember
|
|
||||||
self._client = MQTTClient(plugin._username, plugin._key)
|
|
||||||
self._client.on_connect = self.on_connect()
|
|
||||||
self._client.on_disconnect = self.on_disconnect()
|
|
||||||
self._client.on_message = self.on_message()
|
|
||||||
|
|
||||||
def on_connect(self):
|
|
||||||
def _handler(client):
|
|
||||||
for feed in self.feeds:
|
|
||||||
client.subscribe(feed)
|
|
||||||
self.bus.post(ConnectedEvent())
|
|
||||||
|
|
||||||
return _handler
|
|
||||||
|
|
||||||
def on_disconnect(self):
|
|
||||||
def _handler(*_, **__):
|
|
||||||
self.bus.post(DisconnectedEvent())
|
|
||||||
|
|
||||||
return _handler
|
|
||||||
|
|
||||||
def on_message(self, *_, **__):
|
|
||||||
# noinspection PyUnusedLocal
|
|
||||||
def _handler(client, feed, data):
|
|
||||||
try:
|
|
||||||
data = float(data)
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.debug('Not a number: {}: {}'.format(data, e))
|
|
||||||
|
|
||||||
self.bus.post(FeedUpdateEvent(feed=feed, data=data))
|
|
||||||
|
|
||||||
return _handler
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
super().run()
|
|
||||||
|
|
||||||
self.logger.info(
|
|
||||||
('Initialized Adafruit IO backend, listening on ' + 'feeds {}').format(
|
|
||||||
self.feeds
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
while not self.should_stop():
|
|
||||||
try:
|
|
||||||
self._init_client()
|
|
||||||
# noinspection PyUnresolvedReferences
|
|
||||||
self._client.connect()
|
|
||||||
# noinspection PyUnresolvedReferences
|
|
||||||
self._client.loop_blocking()
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.exception(e)
|
|
||||||
self._client = None
|
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
|
|
@ -1,12 +0,0 @@
|
||||||
manifest:
|
|
||||||
events:
|
|
||||||
platypush.message.event.adafruit.ConnectedEvent: when thebackend connects to the
|
|
||||||
Adafruit queue
|
|
||||||
platypush.message.event.adafruit.DisconnectedEvent: when thebackend disconnects
|
|
||||||
from the Adafruit queue
|
|
||||||
platypush.message.event.adafruit.FeedUpdateEvent: when anupdate event is received
|
|
||||||
on a monitored feed
|
|
||||||
install:
|
|
||||||
pip: []
|
|
||||||
package: platypush.backend.adafruit.io
|
|
||||||
type: backend
|
|
|
@ -1,88 +0,0 @@
|
||||||
import json
|
|
||||||
|
|
||||||
from typing import Optional, List
|
|
||||||
|
|
||||||
from platypush.backend import Backend
|
|
||||||
from platypush.context import get_plugin
|
|
||||||
from platypush.message.event.google.pubsub import GooglePubsubMessageEvent
|
|
||||||
|
|
||||||
|
|
||||||
class GooglePubsubBackend(Backend):
|
|
||||||
"""
|
|
||||||
Subscribe to a list of topics on a Google Pub/Sub instance. See
|
|
||||||
:class:`platypush.plugins.google.pubsub.GooglePubsubPlugin` for a reference on how to generate your
|
|
||||||
project and credentials file.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self, topics: List[str], credentials_file: Optional[str] = None, *args, **kwargs
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
:param topics: List of topics to subscribe. You can either specify the full topic name in the format
|
|
||||||
``projects/<project_id>/topics/<topic_name>``, where ``<project_id>`` must be the ID of your
|
|
||||||
Google Pub/Sub project, or just ``<topic_name>`` - in such case it's implied that you refer to the
|
|
||||||
``topic_name`` under the ``project_id`` of your service credentials.
|
|
||||||
:param credentials_file: Path to the Pub/Sub service credentials file (default: value configured on the
|
|
||||||
``google.pubsub`` plugin or ``~/.credentials/platypush/google/pubsub.json``).
|
|
||||||
"""
|
|
||||||
|
|
||||||
super().__init__(*args, name='GooglePubSub', **kwargs)
|
|
||||||
self.topics = topics
|
|
||||||
|
|
||||||
if credentials_file:
|
|
||||||
self.credentials_file = credentials_file
|
|
||||||
else:
|
|
||||||
plugin = self._get_plugin()
|
|
||||||
self.credentials_file = plugin.credentials_file
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _get_plugin():
|
|
||||||
plugin = get_plugin('google.pubsub')
|
|
||||||
assert plugin, 'google.pubsub plugin not enabled'
|
|
||||||
return plugin
|
|
||||||
|
|
||||||
def _message_callback(self, topic):
|
|
||||||
def callback(msg):
|
|
||||||
data = msg.data.decode()
|
|
||||||
try:
|
|
||||||
data = json.loads(data)
|
|
||||||
except Exception as e:
|
|
||||||
self.logger.debug('Not a valid JSON: %s: %s', data, e)
|
|
||||||
|
|
||||||
msg.ack()
|
|
||||||
self.bus.post(GooglePubsubMessageEvent(topic=topic, msg=data))
|
|
||||||
|
|
||||||
return callback
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
# noinspection PyPackageRequirements
|
|
||||||
from google.cloud import pubsub_v1
|
|
||||||
|
|
||||||
# noinspection PyPackageRequirements
|
|
||||||
from google.api_core.exceptions import AlreadyExists
|
|
||||||
|
|
||||||
super().run()
|
|
||||||
plugin = self._get_plugin()
|
|
||||||
project_id = plugin.get_project_id()
|
|
||||||
credentials = plugin.get_credentials(plugin.subscriber_audience)
|
|
||||||
subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
|
|
||||||
|
|
||||||
for topic in self.topics:
|
|
||||||
prefix = f'projects/{project_id}/topics/'
|
|
||||||
if not topic.startswith(prefix):
|
|
||||||
topic = f'{prefix}{topic}'
|
|
||||||
subscription_name = '/'.join(
|
|
||||||
[*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
subscriber.create_subscription(name=subscription_name, topic=topic)
|
|
||||||
except AlreadyExists:
|
|
||||||
pass
|
|
||||||
|
|
||||||
subscriber.subscribe(subscription_name, self._message_callback(topic))
|
|
||||||
|
|
||||||
self.wait_stop()
|
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
|
|
@ -1,9 +0,0 @@
|
||||||
manifest:
|
|
||||||
events:
|
|
||||||
platypush.message.event.google.pubsub.GooglePubsubMessageEvent: when a new message
|
|
||||||
is received ona subscribed topic.
|
|
||||||
install:
|
|
||||||
pip:
|
|
||||||
- google-cloud-pubsub
|
|
||||||
package: platypush.backend.google.pubsub
|
|
||||||
type: backend
|
|
Binary file not shown.
|
@ -1,27 +1,28 @@
|
||||||
from platypush.message.event import Event
|
from platypush.message.event import Event
|
||||||
|
|
||||||
|
|
||||||
class ConnectedEvent(Event):
|
class AdafruitConnectedEvent(Event):
|
||||||
"""
|
"""
|
||||||
Event triggered when the backend connects to the Adafruit message queue
|
Event triggered when the backend connects to the Adafruit message queue.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class DisconnectedEvent(Event):
|
class AdafruitDisconnectedEvent(Event):
|
||||||
"""
|
"""
|
||||||
Event triggered when the backend disconnects from the Adafruit message queue
|
Event triggered when the backend disconnects from the Adafruit message
|
||||||
|
queue.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
class FeedUpdateEvent(Event):
|
class AdafruitFeedUpdateEvent(Event):
|
||||||
"""
|
"""
|
||||||
Event triggered upon Adafruit IO feed update
|
Event triggered when a message is received on a subscribed Adafruit feed.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, feed, data, *args, **kwargs):
|
def __init__(self, feed, data, *args, **kwargs):
|
||||||
|
|
|
@ -1,26 +1,30 @@
|
||||||
import ast
|
|
||||||
import statistics
|
import statistics
|
||||||
import json
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from threading import Thread, Lock
|
from queue import Empty, Queue
|
||||||
|
from threading import Thread
|
||||||
|
from typing import Iterable, Optional, Union
|
||||||
|
|
||||||
from platypush.context import get_backend
|
from platypush.message.event.adafruit import (
|
||||||
from platypush.plugins import Plugin, action
|
AdafruitConnectedEvent,
|
||||||
|
AdafruitDisconnectedEvent,
|
||||||
data_throttler_lock = None
|
AdafruitFeedUpdateEvent,
|
||||||
|
)
|
||||||
|
from platypush.plugins import RunnablePlugin, action
|
||||||
|
|
||||||
|
|
||||||
class AdafruitIoPlugin(Plugin):
|
class AdafruitIoPlugin(RunnablePlugin):
|
||||||
"""
|
"""
|
||||||
This plugin allows you to interact with the Adafruit IO
|
This plugin allows you to interact with `Adafruit IO
|
||||||
<https://io.adafruit.com>, a cloud-based message queue and storage.
|
<https://io.adafruit.com>`_, a cloud-based message queue and storage. You
|
||||||
You can send values to feeds on your Adafruit IO account and read the
|
can use this plugin to send and receive data to topics connected to your
|
||||||
values of those feeds as well through any device.
|
Adafruit IO account.
|
||||||
|
|
||||||
Some example usages::
|
Some example usages:
|
||||||
|
|
||||||
# Send the temperature value for a connected sensor to the "temperature" feed
|
.. code-block:: javascript
|
||||||
|
|
||||||
|
// Send the temperature value for a connected sensor to the "temperature" feed
|
||||||
{
|
{
|
||||||
"type": "request",
|
"type": "request",
|
||||||
"action": "adafruit.io.send",
|
"action": "adafruit.io.send",
|
||||||
|
@ -30,7 +34,7 @@ class AdafruitIoPlugin(Plugin):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# Receive the most recent temperature value
|
// Receive the most recent temperature value
|
||||||
{
|
{
|
||||||
"type": "request",
|
"type": "request",
|
||||||
"action": "adafruit.io.receive",
|
"action": "adafruit.io.receive",
|
||||||
|
@ -38,90 +42,112 @@ class AdafruitIoPlugin(Plugin):
|
||||||
"feed": "temperature"
|
"feed": "temperature"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
_DATA_THROTTLER_QUEUE = 'platypush/adafruit.io'
|
def __init__(
|
||||||
|
self,
|
||||||
def __init__(self, username, key, throttle_seconds=None, **kwargs):
|
username: str,
|
||||||
|
key: str,
|
||||||
|
feeds: Iterable[str] = (),
|
||||||
|
throttle_interval: Optional[float] = None,
|
||||||
|
**kwargs
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
:param username: Your Adafruit username
|
:param username: Your Adafruit username
|
||||||
:type username: str
|
|
||||||
|
|
||||||
:param key: Your Adafruit IO key
|
:param key: Your Adafruit IO key
|
||||||
:type key: str
|
:param feeds: List of feeds to subscribe to. If not set, then the plugin
|
||||||
|
will not subscribe to any feed unless instructed to do so, neither
|
||||||
:param throttle_seconds: If set, then instead of sending the values directly over ``send`` the plugin will
|
it will emit any event.
|
||||||
first collect all the samples within the specified period and then dispatch them to Adafruit IO.
|
:param throttle_interval: If set, then instead of sending the values
|
||||||
You may want to set it if you have data sources providing a lot of data points and you don't want to hit
|
directly over ``send`` the plugin will first collect all the
|
||||||
the throttling limitations of Adafruit.
|
samples within the specified period and then dispatch them to
|
||||||
:type throttle_seconds: float
|
Adafruit IO. You may want to set it if you have data sources
|
||||||
|
providing a lot of data points and you don't want to hit the
|
||||||
|
throttling limitations of Adafruit.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from Adafruit_IO import Client
|
from Adafruit_IO import Client, MQTTClient
|
||||||
|
|
||||||
global data_throttler_lock
|
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
|
|
||||||
self._username = username
|
self._username = username
|
||||||
self._key = key
|
self._key = key
|
||||||
|
self.feeds = feeds
|
||||||
self.aio = Client(username=username, key=key)
|
self.aio = Client(username=username, key=key)
|
||||||
self.throttle_seconds = throttle_seconds
|
self._mqtt_client: Optional[MQTTClient] = None
|
||||||
|
self.throttle_interval = throttle_interval
|
||||||
|
self._data_throttler_thread: Optional[Thread] = None
|
||||||
|
self._data_throttler_queue = Queue()
|
||||||
|
|
||||||
if not data_throttler_lock:
|
@property
|
||||||
data_throttler_lock = Lock()
|
def _mqtt(self):
|
||||||
|
if not self._mqtt_client:
|
||||||
|
from Adafruit_IO import MQTTClient
|
||||||
|
|
||||||
if self.throttle_seconds and not data_throttler_lock.locked():
|
self._mqtt_client = MQTTClient(self._username, self._key)
|
||||||
self._get_redis()
|
self._mqtt_client.on_connect = self._on_connect # type: ignore
|
||||||
self.logger.info('Starting Adafruit IO throttler thread')
|
self._mqtt_client.on_disconnect = self._on_disconnect # type: ignore
|
||||||
data_throttler_lock.acquire(False)
|
self._mqtt_client.on_message = self._on_message # type: ignore
|
||||||
self.data_throttler = Thread(target=self._data_throttler())
|
|
||||||
self.data_throttler.start()
|
|
||||||
|
|
||||||
@staticmethod
|
return self._mqtt_client
|
||||||
def _get_redis():
|
|
||||||
from redis import Redis
|
|
||||||
|
|
||||||
redis_args = {
|
def _on_connect(self, *_, **__):
|
||||||
'host': 'localhost',
|
assert self._mqtt_client, 'MQTT client not initialized'
|
||||||
}
|
for feed in self.feeds:
|
||||||
|
self._mqtt_client.subscribe(feed)
|
||||||
|
self._bus.post(AdafruitConnectedEvent())
|
||||||
|
|
||||||
redis_backend = get_backend('redis')
|
def _on_disconnect(self, *_, **__):
|
||||||
if redis_backend:
|
self._bus.post(AdafruitDisconnectedEvent())
|
||||||
redis_args = get_backend('redis').redis_args
|
|
||||||
redis_args['socket_timeout'] = 1
|
def _on_message(self, _, feed, data, *__):
|
||||||
return Redis(**redis_args)
|
try:
|
||||||
|
data = float(data)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
self._bus.post(AdafruitFeedUpdateEvent(feed=feed, data=data))
|
||||||
|
|
||||||
|
def _subscribe(self, feed: str):
|
||||||
|
assert self._mqtt_client, 'MQTT client not initialized'
|
||||||
|
self._mqtt_client.subscribe(feed)
|
||||||
|
|
||||||
|
def _unsubscribe(self, feed: str):
|
||||||
|
assert self._mqtt_client, 'MQTT client not initialized'
|
||||||
|
self._mqtt_client.unsubscribe(feed)
|
||||||
|
|
||||||
def _data_throttler(self):
|
def _data_throttler(self):
|
||||||
from redis.exceptions import TimeoutError as QueueTimeoutError
|
|
||||||
|
|
||||||
def run():
|
|
||||||
from Adafruit_IO import ThrottlingError
|
from Adafruit_IO import ThrottlingError
|
||||||
|
|
||||||
redis = self._get_redis()
|
if not self.throttle_interval:
|
||||||
|
return
|
||||||
|
|
||||||
last_processed_batch_timestamp = None
|
last_processed_batch_timestamp = None
|
||||||
data = {}
|
data = {}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
while True:
|
while not self.should_stop():
|
||||||
try:
|
try:
|
||||||
new_data = ast.literal_eval(
|
new_data = self._data_throttler_queue.get(timeout=1)
|
||||||
redis.blpop(self._DATA_THROTTLER_QUEUE)[1].decode('utf-8')
|
for key, value in new_data.items():
|
||||||
)
|
|
||||||
|
|
||||||
for (key, value) in new_data.items():
|
|
||||||
data.setdefault(key, []).append(value)
|
data.setdefault(key, []).append(value)
|
||||||
except QueueTimeoutError:
|
except Empty:
|
||||||
pass
|
continue
|
||||||
|
|
||||||
if data and (
|
should_process = data and (
|
||||||
last_processed_batch_timestamp is None
|
last_processed_batch_timestamp is None
|
||||||
or time.time() - last_processed_batch_timestamp
|
or time.time() - last_processed_batch_timestamp
|
||||||
>= self.throttle_seconds
|
>= self.throttle_interval
|
||||||
):
|
)
|
||||||
|
|
||||||
|
if not should_process:
|
||||||
|
continue
|
||||||
|
|
||||||
last_processed_batch_timestamp = time.time()
|
last_processed_batch_timestamp = time.time()
|
||||||
self.logger.info('Processing feeds batch for Adafruit IO')
|
self.logger.info('Processing feeds batch for Adafruit IO')
|
||||||
|
|
||||||
for (feed, values) in data.items():
|
for feed, values in data.items():
|
||||||
if values:
|
if values:
|
||||||
value = statistics.mean(values)
|
value = statistics.mean(values)
|
||||||
|
|
||||||
|
@ -132,70 +158,78 @@ class AdafruitIoPlugin(Plugin):
|
||||||
'Adafruit IO throttling threshold hit, taking a nap '
|
'Adafruit IO throttling threshold hit, taking a nap '
|
||||||
+ 'before retrying'
|
+ 'before retrying'
|
||||||
)
|
)
|
||||||
time.sleep(self.throttle_seconds)
|
self.wait_stop(self.throttle_interval)
|
||||||
|
|
||||||
data = {}
|
data = {}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.exception(e)
|
self.logger.exception(e)
|
||||||
|
|
||||||
return run
|
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def send(self, feed, value, enqueue=True):
|
def send(self, feed: str, value: Union[int, float, str], enqueue: bool = True):
|
||||||
"""
|
"""
|
||||||
Send a value to an Adafruit IO feed
|
Send a value to an Adafruit IO feed.
|
||||||
|
|
||||||
:param feed: Feed name
|
:param feed: Feed name.
|
||||||
:type feed: str
|
:param value: Value to send.
|
||||||
|
:param enqueue: If throttle_interval is set, this method by default will append values to the throttling queue
|
||||||
:param value: Value to send
|
|
||||||
:type value: Numeric or string
|
|
||||||
|
|
||||||
:param enqueue: If throttle_seconds is set, this method by default will append values to the throttling queue
|
|
||||||
to be periodically flushed instead of sending the message directly. In such case, pass enqueue=False to
|
to be periodically flushed instead of sending the message directly. In such case, pass enqueue=False to
|
||||||
override the behaviour and send the message directly instead.
|
override the behaviour and send the message directly instead.
|
||||||
:type enqueue: bool
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not self.throttle_seconds or not enqueue:
|
if not self.throttle_interval or not enqueue:
|
||||||
# If no throttling is configured, or enqueue is false then send the value directly to Adafruit
|
# If no throttling is configured, or enqueue is false then send the value directly to Adafruit
|
||||||
self.aio.send(feed, value)
|
self.aio.send(feed, value)
|
||||||
else:
|
else:
|
||||||
# Otherwise send it to the Redis queue to be picked up by the throttler thread
|
# Otherwise send it to the queue to be picked up by the throttler thread
|
||||||
redis = self._get_redis()
|
self._data_throttler_queue.put({feed: value})
|
||||||
redis.rpush(self._DATA_THROTTLER_QUEUE, json.dumps({feed: value}))
|
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def send_location_data(self, feed, lat, lon, ele, value):
|
def send_location_data(
|
||||||
|
self,
|
||||||
|
feed: str,
|
||||||
|
latitude: float,
|
||||||
|
longitude: float,
|
||||||
|
elevation: float,
|
||||||
|
value: Optional[Union[int, float, str]] = None,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Send location data to an Adafruit IO feed
|
Send location data to an Adafruit IO feed
|
||||||
|
|
||||||
:param feed: Feed name
|
:param feed: Feed name
|
||||||
:type feed: str
|
|
||||||
|
|
||||||
:param lat: Latitude
|
:param lat: Latitude
|
||||||
:type lat: float
|
|
||||||
|
|
||||||
:param lon: Longitude
|
:param lon: Longitude
|
||||||
:type lon: float
|
|
||||||
|
|
||||||
:param ele: Elevation
|
:param ele: Elevation
|
||||||
:type ele: float
|
:param value: Extra value to attach to the record
|
||||||
|
|
||||||
:param value: Value to send
|
|
||||||
:type value: Numeric or string
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
self.aio.send_data(
|
self.aio.send_data(
|
||||||
feed=feed,
|
feed=feed,
|
||||||
value=value,
|
value=value,
|
||||||
metadata={
|
metadata={
|
||||||
'lat': lat,
|
'lat': latitude,
|
||||||
'lon': lon,
|
'lon': longitude,
|
||||||
'ele': ele,
|
'ele': elevation,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@action
|
||||||
|
def subscribe(self, feed: str):
|
||||||
|
"""
|
||||||
|
Subscribe to a feed.
|
||||||
|
|
||||||
|
:param feed: Feed name
|
||||||
|
"""
|
||||||
|
self._subscribe(feed)
|
||||||
|
|
||||||
|
@action
|
||||||
|
def unsubscribe(self, feed: str):
|
||||||
|
"""
|
||||||
|
Unsubscribe from a feed.
|
||||||
|
|
||||||
|
:param feed: Feed name
|
||||||
|
"""
|
||||||
|
self._unsubscribe(feed)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _cast_value(cls, value):
|
def _cast_value(cls, value):
|
||||||
try:
|
try:
|
||||||
|
@ -220,17 +254,14 @@ class AdafruitIoPlugin(Plugin):
|
||||||
]
|
]
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def receive(self, feed, limit=1):
|
def receive(self, feed: str, limit: int = 1):
|
||||||
"""
|
"""
|
||||||
Receive data from the specified Adafruit IO feed
|
Receive data from the specified Adafruit IO feed
|
||||||
|
|
||||||
:param feed: Feed name
|
:param feed: Feed name
|
||||||
:type feed: str
|
|
||||||
|
|
||||||
:param limit: Maximum number of data points to be returned. If None,
|
:param limit: Maximum number of data points to be returned. If None,
|
||||||
all the values in the feed will be returned. Default: 1 (return most
|
all the values in the feed will be returned. Default: 1 (return most
|
||||||
recent value)
|
recent value)
|
||||||
:type limit: int
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if limit == 1:
|
if limit == 1:
|
||||||
|
@ -241,42 +272,84 @@ class AdafruitIoPlugin(Plugin):
|
||||||
return values[:limit] if limit else values
|
return values[:limit] if limit else values
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def receive_next(self, feed):
|
def receive_next(self, feed: str):
|
||||||
"""
|
"""
|
||||||
Receive the next unprocessed data point from a feed
|
Receive the next unprocessed data point from a feed
|
||||||
|
|
||||||
:param feed: Feed name
|
:param feed: Feed name
|
||||||
:type feed: str
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
values = self._convert_data_to_dict(self.aio.receive_next(feed))
|
values = self._convert_data_to_dict(self.aio.receive_next(feed))
|
||||||
return values[0] if values else None
|
return values[0] if values else None
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def receive_previous(self, feed):
|
def receive_previous(self, feed: str):
|
||||||
"""
|
"""
|
||||||
Receive the last processed data point from a feed
|
Receive the last processed data point from a feed
|
||||||
|
|
||||||
:param feed: Feed name
|
:param feed: Feed name
|
||||||
:type feed: str
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
values = self._convert_data_to_dict(self.aio.receive_previous(feed))
|
values = self._convert_data_to_dict(self.aio.receive_previous(feed))
|
||||||
return values[0] if values else None
|
return values[0] if values else None
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def delete(self, feed, data_id):
|
def delete(self, feed: str, data_id: str):
|
||||||
"""
|
"""
|
||||||
Delete a data point from a feed
|
Delete a data point from a feed
|
||||||
|
|
||||||
:param feed: Feed name
|
:param feed: Feed name
|
||||||
:type feed: str
|
|
||||||
|
|
||||||
:param data_id: Data point ID to remove
|
:param data_id: Data point ID to remove
|
||||||
:type data_id: str
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
self.aio.delete(feed, data_id)
|
self.aio.delete(feed, data_id)
|
||||||
|
|
||||||
|
def main(self):
|
||||||
|
if self.throttle_interval:
|
||||||
|
self._data_throttler_thread = Thread(target=self._data_throttler)
|
||||||
|
self._data_throttler_thread.start()
|
||||||
|
|
||||||
|
try:
|
||||||
|
while not self.should_stop():
|
||||||
|
cur_wait = 1
|
||||||
|
max_wait = 60
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._mqtt.connect()
|
||||||
|
cur_wait = 1
|
||||||
|
self._mqtt.loop_blocking()
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.warning(
|
||||||
|
'Adafruit IO connection error: %s, retrying in %d seconds',
|
||||||
|
e,
|
||||||
|
cur_wait,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.logger.exception(e)
|
||||||
|
self.wait_stop(cur_wait)
|
||||||
|
cur_wait = min(cur_wait * 2, max_wait)
|
||||||
|
finally:
|
||||||
|
self._stop_mqtt()
|
||||||
|
finally:
|
||||||
|
self._stop_data_throttler()
|
||||||
|
|
||||||
|
def _stop_mqtt(self):
|
||||||
|
if self._mqtt_client:
|
||||||
|
self._mqtt_client.disconnect()
|
||||||
|
self._mqtt_client = None
|
||||||
|
|
||||||
|
def _stop_data_throttler(self):
|
||||||
|
if self._data_throttler_thread:
|
||||||
|
self._data_throttler_thread.join()
|
||||||
|
self._data_throttler_thread = None
|
||||||
|
|
||||||
|
def _stop(self):
|
||||||
|
self._stop_mqtt()
|
||||||
|
self._stop_data_throttler()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self._stop()
|
||||||
|
super().stop()
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
|
@ -1,5 +1,8 @@
|
||||||
manifest:
|
manifest:
|
||||||
events: {}
|
events:
|
||||||
|
- platypush.message.event.adafruit.AdafruitConnectedEvent
|
||||||
|
- platypush.message.event.adafruit.AdafruitDisconnectedEvent
|
||||||
|
- platypush.message.event.adafruit.AdafruitFeedUpdateEvent
|
||||||
install:
|
install:
|
||||||
pip:
|
pip:
|
||||||
- adafruit-io
|
- adafruit-io
|
||||||
|
|
|
@ -1,13 +1,23 @@
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
from threading import RLock
|
||||||
|
from typing import Iterable, Optional
|
||||||
|
|
||||||
from platypush.plugins import Plugin, action
|
from google.auth import jwt
|
||||||
|
from google.cloud import pubsub_v1 as pubsub # pylint: disable=no-name-in-module
|
||||||
|
from google.api_core.exceptions import AlreadyExists, NotFound
|
||||||
|
|
||||||
|
from platypush.config import Config
|
||||||
|
from platypush.message.event.google.pubsub import GooglePubsubMessageEvent
|
||||||
|
from platypush.plugins import RunnablePlugin, action
|
||||||
|
|
||||||
|
|
||||||
class GooglePubsubPlugin(Plugin):
|
class GooglePubsubPlugin(RunnablePlugin):
|
||||||
"""
|
"""
|
||||||
Send messages over a Google pub/sub instance.
|
Publishes and subscribes to Google Pub/Sub topics.
|
||||||
You'll need a Google Cloud active project and a set of credentials to use this plugin:
|
|
||||||
|
You'll need a Google Cloud active project and a set of credentials to use
|
||||||
|
this plugin:
|
||||||
|
|
||||||
1. Create a project on the `Google Cloud console
|
1. Create a project on the `Google Cloud console
|
||||||
<https://console.cloud.google.com/projectcreate>`_ if you don't have
|
<https://console.cloud.google.com/projectcreate>`_ if you don't have
|
||||||
|
@ -20,40 +30,58 @@ class GooglePubsubPlugin(Plugin):
|
||||||
|
|
||||||
3. Download the JSON service credentials file. By default Platypush
|
3. Download the JSON service credentials file. By default Platypush
|
||||||
will look for the credentials file under
|
will look for the credentials file under
|
||||||
``~/.credentials/platypush/google/pubsub.json``.
|
``<WORKDIR>/credentials/google/pubsub.json``.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
publisher_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher'
|
publisher_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Publisher'
|
||||||
subscriber_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber'
|
subscriber_audience = 'https://pubsub.googleapis.com/google.pubsub.v1.Subscriber'
|
||||||
default_credentials_file = os.path.join(
|
default_credentials_file = os.path.join(
|
||||||
os.path.expanduser('~'), '.credentials', 'platypush', 'google', 'pubsub.json'
|
Config.get_workdir(), 'credentials', 'google', 'pubsub.json'
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, credentials_file: str = default_credentials_file, **kwargs):
|
def __init__(
|
||||||
|
self,
|
||||||
|
credentials_file: str = default_credentials_file,
|
||||||
|
topics: Iterable[str] = (),
|
||||||
|
**kwargs,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
:param credentials_file: Path to the JSON credentials file for Google
|
:param credentials_file: Path to the JSON credentials file for Google
|
||||||
pub/sub (default: ``~/.credentials/platypush/google/pubsub.json``)
|
pub/sub (default: ``~/.credentials/platypush/google/pubsub.json``)
|
||||||
|
:param topics: List of topics to subscribe. You can either specify the
|
||||||
|
full topic name in the format
|
||||||
|
``projects/<project_id>/topics/<topic_name>``, where
|
||||||
|
``<project_id>`` must be the ID of your Google Pub/Sub project, or
|
||||||
|
just ``<topic_name>`` - in such case it's implied that you refer to
|
||||||
|
the ``topic_name`` under the ``project_id`` of your service
|
||||||
|
credentials.
|
||||||
"""
|
"""
|
||||||
super().__init__(**kwargs)
|
super().__init__(**kwargs)
|
||||||
self.credentials_file = credentials_file
|
self.credentials_file = credentials_file
|
||||||
self.project_id = self.get_project_id()
|
self.project_id = self.get_project_id()
|
||||||
|
self.topics = topics
|
||||||
|
self._subscriber: Optional[pubsub.SubscriberClient] = None
|
||||||
|
self._subscriber_lock = RLock()
|
||||||
|
|
||||||
def get_project_id(self):
|
def get_project_id(self):
|
||||||
with open(self.credentials_file) as f:
|
with open(self.credentials_file) as f:
|
||||||
return json.load(f).get('project_id')
|
return json.load(f).get('project_id')
|
||||||
|
|
||||||
def get_credentials(self, audience: str):
|
def get_credentials(self, audience: str):
|
||||||
from google.auth import jwt
|
|
||||||
|
|
||||||
return jwt.Credentials.from_service_account_file(
|
return jwt.Credentials.from_service_account_file(
|
||||||
self.credentials_file, audience=audience
|
self.credentials_file, audience=audience
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _norm_topic(self, topic: str):
|
||||||
|
if not topic.startswith(f'projects/{self.project_id}/topics/'):
|
||||||
|
topic = f'projects/{self.project_id}/topics/{topic}'
|
||||||
|
return topic
|
||||||
|
|
||||||
@action
|
@action
|
||||||
def send_message(self, topic: str, msg, **kwargs):
|
def publish(self, topic: str, msg, **kwargs):
|
||||||
"""
|
"""
|
||||||
Sends a message to a topic
|
Publish a message to a topic
|
||||||
|
|
||||||
:param topic: Topic/channel where the message will be delivered. You
|
:param topic: Topic/channel where the message will be delivered. You
|
||||||
can either specify the full topic name in the format
|
can either specify the full topic name in the format
|
||||||
|
@ -65,19 +93,14 @@ class GooglePubsubPlugin(Plugin):
|
||||||
:param msg: Message to be sent. It can be a list, a dict, or a Message object
|
:param msg: Message to be sent. It can be a list, a dict, or a Message object
|
||||||
:param kwargs: Extra arguments to be passed to .publish()
|
:param kwargs: Extra arguments to be passed to .publish()
|
||||||
"""
|
"""
|
||||||
from google.cloud import pubsub_v1
|
|
||||||
from google.api_core.exceptions import AlreadyExists
|
|
||||||
|
|
||||||
credentials = self.get_credentials(self.publisher_audience)
|
credentials = self.get_credentials(self.publisher_audience)
|
||||||
publisher = pubsub_v1.PublisherClient(credentials=credentials)
|
publisher = pubsub.PublisherClient(credentials=credentials)
|
||||||
|
topic = self._norm_topic(topic)
|
||||||
if not topic.startswith(f'projects/{self.project_id}/topics/'):
|
|
||||||
topic = f'projects/{self.project_id}/topics/{topic}'
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
publisher.create_topic(topic)
|
publisher.create_topic(name=topic)
|
||||||
except AlreadyExists:
|
except AlreadyExists:
|
||||||
pass
|
self.logger.debug('Topic %s already exists', topic)
|
||||||
|
|
||||||
if isinstance(msg, (int, float)):
|
if isinstance(msg, (int, float)):
|
||||||
msg = str(msg)
|
msg = str(msg)
|
||||||
|
@ -88,5 +111,101 @@ class GooglePubsubPlugin(Plugin):
|
||||||
|
|
||||||
publisher.publish(topic, msg, **kwargs)
|
publisher.publish(topic, msg, **kwargs)
|
||||||
|
|
||||||
|
@action
|
||||||
|
def send_message(self, topic: str, msg, **kwargs):
|
||||||
|
"""
|
||||||
|
Alias for :meth:`.publish`
|
||||||
|
"""
|
||||||
|
self.publish(topic=topic, msg=msg, **kwargs)
|
||||||
|
|
||||||
|
@action
|
||||||
|
def subscribe(self, topic: str):
|
||||||
|
"""
|
||||||
|
Subscribe to a topic.
|
||||||
|
|
||||||
|
:param topic: Topic/channel where the message will be delivered. You
|
||||||
|
can either specify the full topic name in the format
|
||||||
|
``projects/<project_id>/topics/<topic_name>``, where
|
||||||
|
``<project_id>`` must be the ID of your Google Pub/Sub project, or
|
||||||
|
just ``<topic_name>`` - in such case it's implied that you refer to
|
||||||
|
the ``topic_name`` under the ``project_id`` of your service
|
||||||
|
credentials.
|
||||||
|
"""
|
||||||
|
assert self._subscriber, 'Subscriber not initialized'
|
||||||
|
topic = self._norm_topic(topic)
|
||||||
|
subscription_name = '/'.join(
|
||||||
|
[*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._subscriber.create_subscription(name=subscription_name, topic=topic)
|
||||||
|
except AlreadyExists:
|
||||||
|
self.logger.debug('Subscription %s already exists', subscription_name)
|
||||||
|
|
||||||
|
self._subscriber.subscribe(subscription_name, self._message_callback(topic))
|
||||||
|
|
||||||
|
@action
|
||||||
|
def unsubscribe(self, topic: str):
|
||||||
|
"""
|
||||||
|
Unsubscribe from a topic.
|
||||||
|
|
||||||
|
:param topic: Topic/channel where the message will be delivered. You
|
||||||
|
can either specify the full topic name in the format
|
||||||
|
``projects/<project_id>/topics/<topic_name>``, where
|
||||||
|
``<project_id>`` must be the ID of your Google Pub/Sub project, or
|
||||||
|
just ``<topic_name>`` - in such case it's implied that you refer to
|
||||||
|
the ``topic_name`` under the ``project_id`` of your service
|
||||||
|
credentials.
|
||||||
|
"""
|
||||||
|
assert self._subscriber, 'Subscriber not initialized'
|
||||||
|
topic = self._norm_topic(topic)
|
||||||
|
subscription_name = '/'.join(
|
||||||
|
[*topic.split('/')[:2], 'subscriptions', topic.split('/')[-1]]
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._subscriber.delete_subscription(subscription=subscription_name)
|
||||||
|
except NotFound:
|
||||||
|
self.logger.debug('Subscription %s not found', subscription_name)
|
||||||
|
|
||||||
|
def _message_callback(self, topic):
|
||||||
|
def callback(msg):
|
||||||
|
data = msg.data.decode()
|
||||||
|
try:
|
||||||
|
data = json.loads(data)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.debug('Not a valid JSON: %s: %s', data, e)
|
||||||
|
|
||||||
|
msg.ack()
|
||||||
|
self._bus.post(GooglePubsubMessageEvent(topic=topic, msg=data))
|
||||||
|
|
||||||
|
return callback
|
||||||
|
|
||||||
|
def main(self):
|
||||||
|
credentials = self.get_credentials(self.subscriber_audience)
|
||||||
|
with self._subscriber_lock:
|
||||||
|
self._subscriber = pubsub.SubscriberClient(credentials=credentials)
|
||||||
|
|
||||||
|
for topic in self.topics:
|
||||||
|
self.subscribe(topic=topic)
|
||||||
|
|
||||||
|
self.wait_stop()
|
||||||
|
with self._subscriber_lock:
|
||||||
|
self._close()
|
||||||
|
|
||||||
|
def _close(self):
|
||||||
|
with self._subscriber_lock:
|
||||||
|
if self._subscriber:
|
||||||
|
try:
|
||||||
|
self._subscriber.close()
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.debug('Error while closing the subscriber: %s', e)
|
||||||
|
|
||||||
|
self._subscriber = None
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self._close()
|
||||||
|
super().stop()
|
||||||
|
|
||||||
|
|
||||||
# vim:sw=4:ts=4:et:
|
# vim:sw=4:ts=4:et:
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
manifest:
|
manifest:
|
||||||
events: {}
|
events:
|
||||||
|
- platypush.message.event.google.pubsub.GooglePubsubMessageEvent
|
||||||
install:
|
install:
|
||||||
apk:
|
apk:
|
||||||
- py3-google-api-python-client
|
- py3-google-api-python-client
|
||||||
|
|
|
@ -35,10 +35,13 @@ mock_imports = [
|
||||||
"gi",
|
"gi",
|
||||||
"gi.repository",
|
"gi.repository",
|
||||||
"google",
|
"google",
|
||||||
|
"google.api_core",
|
||||||
|
"google.auth",
|
||||||
"google.assistant.embedded",
|
"google.assistant.embedded",
|
||||||
"google.assistant.library",
|
"google.assistant.library",
|
||||||
"google.assistant.library.event",
|
"google.assistant.library.event",
|
||||||
"google.assistant.library.file_helpers",
|
"google.assistant.library.file_helpers",
|
||||||
|
"google.cloud",
|
||||||
"google.oauth2.credentials",
|
"google.oauth2.credentials",
|
||||||
"googlesamples",
|
"googlesamples",
|
||||||
"googlesamples.assistant.grpc.audio_helpers",
|
"googlesamples.assistant.grpc.audio_helpers",
|
||||||
|
|
Loading…
Reference in a new issue