blog/markdown/How-to-build-your-personal-...

938 lines
40 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

[//]: # (title: How to build your personal infrastructure for data collection and visualization)
[//]: # (description: Use Grafana, PostgreSQL, Mosquitto and Platypush to collect data points on your infrastructure and be the real owner of your own data.)
[//]: # (image: /img/data-visualization-1.png)
[//]: # (author: Fabio Manganiello <fabio@platypush.tech>)
[//]: # (published: 2019-10-16)
A smart home can generate and collect data. Lots of data. And there are currently a few outstanding issues with home-generated data:
- **Fragmentation**. You probably have your home weather station, your own motion detectors, security cameras, gas and smoke detectors, body sensors, GPS and fit trackers and smart plugs around. Its quite likely that most of these devices generates data, and that such data will in most of the cases be only accessible through a proprietary app or web service, and that any integration with other services, or any room for tinkering and automation purposes, will mostly depend on the benevolence of the developer or business in building third-party interfaces for such data. In this article, well explore how, thanks to open source solutions like platypush, Grafana and Mosquitto, its possible to overcome the fragmentation issue and “glue” together data sources that wouldnt otherwise be able to communicate nor share data.
- **Ability to query**. Most of the hardware and data geeks out there wont settle with the ability to access their data through a gauge in an app or a timeline graph. Many of us want the ability to explore our own generated data in a structured way, preferably through SQL, or any query language, and we demand tailor-made dashboards to explore our data, not dumb mobile apps. The ability to generate custom monthly reports of our fit activities, query the countries where weve been in a certain range of time, or how much time we spent indoor in the past three months, or how many times the smoke detectors in our guest room went above threshold in the past week, is, for many of us, priceless, and often neglected by hardware and software makers. In this article well explore how to leverage an open-source relational database (PostgreSQL in this example) and some elementary data pipelines to dispatch and store your data on your private computers, ready to be queried or visualized however you like.
- **Privacy**. Many of the solutions or services Ive mentioned in the previous examples come with their cloud-based infrastructure to store user data. While storing your data on somebody elses computers saves you the time and disk space required to invest in your local solution, it also comes with all the concerns related to — ehm — storing your data on somebody elses computer. That somebody else can decide if and how you can access your data, can decide to sell your data for profit, or can be hacked in a way or another. This can be especially worrisome if were talking data about your own body, location or house environment. A house-hosted data infrastructure bypasses the issue with third-party ownership of your data.
This article will analyze the building blocks to set up your data infrastructure and build automation on it. Well see how to set up data collection and monitoring for a few use cases (temperature, humidity, gas, phone location and fit data) and how to build automation triggers based on such data.
## Dependencies setup
First, youll need a RaspberryPi (or any similar clone) with Platypush. I assume that youve already got Platypush installed and configured. If not, please head to my my previous article on [getting started with Platypush](https://blog.platypush.tech/article/Ultimate-self-hosted-automation-with-Platypush).
Youll also need a relational database installed on your device. The example in this article will rely on PostgreSQL, but any relational database will do its job. To install and configure PostgreSQL on Raspbian and create a database named `sensors`:
```shell
[sudo] apt-get install postgresql libpq-dev postgresql-client
postgresql-client-common -y
[sudo] systemctl restart postgresql.service
[sudo] su postgres
createuser pi -P --interactive
psql -U pi
> create database sensors;
```
Well use the database to store the following information:
- System metrics
- Sensors data
- Smartphone and location data
- Fit data
Youll also need a message queue broker running on your RaspberryPi to dispatch messages with new data reads — check [this Instructables tutorial](https://www.instructables.com/id/Installing-MQTT-BrokerMosquitto-on-Raspberry-Pi/) on how to get Mosquitto up and running on your RaspberryPi.
For some of the data measurements, well also need an MQTT client to test messages over the configured queue — for example, to send measurements from a shell script. I like to use mqttcli for these purposes — its fast, lightweight and written in Go:
```shell
go get github.com/shirou/mqttcli
```
Finally, install Grafana as a web-based gateway to visualize your data:
```shell
[sudo] apt-get install grafana
[sudo] systemctl restart grafana
```
After starting the service head to `http://your-pi:3000` and make sure that you see the Grafana splash screen — create a new admin user and youre good to go for now.
Now that youve got all the fundamental pieces in place its time to set up your data collection pipeline and dashboard. Lets start from setting up the tables and data storage logic on your database.
## Database configuration
If you followed the instructions above then youll have a PostgreSQL instance running on your RaspberryPi, accessible through the user `pi`, and a `sensors` database created for the purpose. In this section, Ill explain how to create the basic tables and the triggers to normalize the data. Keep in mind that your measurement tables might become quite large, depending on how much data you process and how often you process it. Its relatively important, to keep database size under control and to make queries efficient, to provide normalized tables structures enforced by triggers. Ive prepared the following provisioning script for my purposes:
```sql
-- Temporary sensors table where we store the raw
-- measurements as received on the message queue
drop table if exists tmp_sensors cascade;
create table tmp_sensors(
id serial not null,
host varchar(64) not null,
metric varchar(255) not null,
data double precision,
created_at timestamp with time zone default CURRENT_TIMESTAMP,
primary key(id)
);
-- Table to store the hosts associated to the data points
drop table if exists sensor_host cascade;
create table sensor_host(
id serial not null,
host varchar(64) unique not null,
primary key(id)
);
-- Table to store the metrics
drop table if exists sensor_metric cascade;
create table sensor_metric(
id serial not null,
metric varchar(255) unique not null,
primary key(id)
);
-- Table to store the normalized data points
drop table if exists sensor_data cascade;
create table sensor_data(
id serial not null,
host_id integer not null,
metric_id integer not null,
data double precision,
created_at timestamp with time zone default CURRENT_TIMESTAMP,
primary key(id),
foreign key(host_id) references sensor_host(id),
foreign key(metric_id) references sensor_metric(id)
);
-- Define a stored procedure that normalizes new rows on tmp_sensors
-- by either creating or returning the associated host_id and metric_id,
-- creating a normalized representation of the row on sensor_data and
-- delete the original raw entry on tmp_sensors.
create or replace function sync_sensors_data()
returns trigger as
$$
begin
insert into sensor_host(host) values(new.host)
on conflict do nothing;
insert into sensor_metric(metric) values(new.metric)
on conflict do nothing;
insert into sensor_data(host_id, metric_id, data) values(
(select id from sensor_host where host = new.host),
(select id from sensor_metric where metric = new.metric),
new.data
);
delete from tmp_sensors where id = new.id;
return new;
end;
$$
language 'plpgsql';
-- Create a trigger that invokes the store procedure defined above
-- after a row is inserted on tmp_sensors
drop trigger if exists on_sensor_data_insert on tmp_sensors;
create trigger on_sensor_data_insert
after insert on tmp_sensors
for each row
execute procedure sync_sensors_data();
create view public.vsensors AS
select d.id AS data_id,
h.host,
m.metric,
d.data,
d.created_at
from ((public.sensor_data d
join public.sensor_host h ON ((d.host_id = h.id)))
join public.sensor_metric m ON ((d.metric_id = m.id)));
```
The script above will keep the data on your database normalized and query-friendly even if the messages pushed on the message queue dont care about which is the right numeric host_id or metric_id. Run it against your PostgreSQL instance:
```shell
psql -U pi < database_provisioning.sql
```
Now that youve got the tables ready its time to fill them with data. Well see a few examples of metrics collection, starting with system metrics.
## System metrics
You may want to monitor the CPU, RAM or disk usage of your own RaspberryPi or any other host or virtual server youve got around, do things like setting up a dashboard to easily monitor your metrics or set up alerts in case something goes out of control.
First, create a script that checks the memory available on your system and sends the percentage of used memory on a message queue channel — well store this script under `~/bin/send_mem_stats.sh` for the purposes of this tutorial:
```shell
#!/bin/bash
total=$(free -m | head -2 | tail -1 | awk '{print $2}')
avail=$(free -m | head -2 | tail -1 | awk '{print $7}')
let used=$total-$avail
export MQTT_HOST=your-mqtt-server
export MQTT_PORT=1883
$HOME/go/bin/mqttcli pub -t "sensors/$(hostname)/memory" -m $used
```
And schedule it to run every e.g. 5 minutes in your crontab:
```
*/5 * * * * /bin/bash /home/pi/bin/send_mem_stats.sh
```
Similar scripts can be made also for other system stats, for example to monitor the root disk usage:
```shell
#!/bin/bash
usage=$(df -h | egrep '/$' | awk '{print $5}' | tr -d % | awk '{printf "%.2f", ($0/100)}')
export MQTT_HOST=your-mqtt-server
export MQTT_PORT=1883
$HOME/go/bin/mqttcli pub -t "sensors/$(hostname)/disk_root" -m $usage
```
Alternatively, you can also write the logic for sending system data points directly in Platypush -
e.g. using the [`system`](https://docs.platypush.tech/en/latest/platypush/plugins/system.html)
plugin - and that will be executed while the service is running, so you won't need the `mqttcli` dependency:
```python
from datetime import datetime
from platypush.config import Config
from platypush.cron import cron
from platypush.utils import run
@cron('*/5 * * * *')
def send_memory_stats(**context):
mem = run('system.mem_virtual')
run('mqtt.publish', host='your-mqtt-server', port=1883,
topic=f'sensors/{Config.get("device_id")}/memory',
msg=mem['percent'])
```
You can extend this pattern to any sensor data you want to send over the queue.
Once scheduled these jobs will start pushing data to your message queue, on the configured topic (in the examples above respectively to `sensors/<hostname>/memory` and `sensors/<hostname>/disk_root`) at regular intervals.
Its now time to set up Platypush to listen on those channels and whenever a new message comes in store it in the database you have provisioned. Add the following configuration to your `~/.config/platypush/config.yaml` file:
```yaml
# Enable the MQTT backend
backend.mqtt:
host: your-mqtt-server
port: 1883
# Configure platypush to listen for new messages on these topics
listeners:
- host: your-mqtt-server
topics:
- sensors/host1/disk_root
- sensors/host2/disk_root
- sensors/host1/memory
- sensors/host2/memory
```
And create an event hook (e.g. under `~/.config/platypush/scripts/mqtt.py`) that stores the messages
received on some specified channels to your database:
```python
from platypush.event.hook import hook
from platypush.utils import run
from platypush.message.event.mqtt import MQTTMessageEvent
db_engine = 'postgresql+pg8000://pi:your-password@localhost/sensors'
@hook(MQTTMessageEvent)
def on_mqtt_message(event, **context):
if not event.topic.startswith('sensors/'):
return
(prefix, host, metric) = event.topic.split('/')
run('db.insert',
engine=db_engine,
table='tmp_sensors',
records=[{
'host': host,
'metric': metric,
'data': event.msg,
}]
)
```
By inserting the data into `tmp_sensors` we make sure that the triggers that we previously declared on
the database will be executed and data will be normalized.
Start Platypush, and if everything went smooth youll soon see your sensor_data table getting populated with memory and
disk usage stats.
## Sensors data
Commercial weather stations, air quality solutions and presence detectors can be relatively expensive, and relatively
limited when it comes to opening up their data, but by using the ingredients weve talked about so far its relatively
easy to set up your network of sensors around the house and get them to collect data on your existing data
infrastructure. Lets consider for the purposes of this post an example that collects temperature and humidity
measurements from some sensors around the house. Youve got mainly two options when it comes to set up analog sensors on
a RaspberryPi:
- *Option 1*: Use an analog microprocessor (like Arduino or ESP8266) connected to your RaspberryPi over USB and
configure platypush to read analogue measurements over serial port. The RaspberryPi is an amazing piece of technology
but it doesnt come with a native ADC converter. That means that many simple analog sensors available on the market
that map different environment values to different voltage values wont work on a RaspberryPi unless you use a device
in between that can actually read the analog measurements and push them to the RaspberryPi over serial interface. For
my purposes I often use Arduino Nano clones, as theyre usually quite cheap, but any device that can communicate over
USB/serial port should do its job. You can find cheap but accurate temperature and humidity sensors on the internet,
like the [TMP36](https://shop.pimoroni.com/products/temperature-sensor-tmp36), [DHT11](https://learn.adafruit.com/dht)
and [AM2320](https://shop.pimoroni.com/products/digital-temperature-and-humidity-sensor), that can easily be set up to
communicate with your Arduino/ESP* device. All you need is to make sure that your Arduino/ESP* device spits a valid
JSON message back on the serial port whenever it performs a new measurement (e.g. `{"temperature": 21.0, "humidity":
45.0}`), so Platypush can easily understand when there is a change in value for a certain measurement.
- *Option 2*: Devices like the ESP8266 already come with a Wi-Fi module and can directly send message over MQTT through
small MicroPython libraries
like [`umqttsimple`](https://raw.githubusercontent.com/RuiSantosdotme/ESP-MicroPython/master/code/MQTT/umqttsimple.py)
(check out [this tutorial](https://randomnerdtutorials.com/micropython-mqtt-esp32-esp8266/) for ESP8266+MQTT setup).
In this case you wont need a serial connection, and you can directly send data from your sensor to your MQTT server
from the device.
- *Option 3*: Use a breakout sensor (like
the [BMP280](https://shop.pimoroni.com/products/bmp280-breakout-temperature-pressure-altitude-sensor),
[SHT31](https://shop.pimoroni.com/products/adafruit-sensiron-sht31-d-temperature-humidity-sensor-breakout) or
[HTU21D-F](https://shop.pimoroni.com/products/adafruit-htu21d-f-temperature-humidity-sensor-breakout-board)) that
communicates over I2C/SPI that you can plug directly on the RaspberryPi. If you go for this solution then you wont
need another microprocessor to deal with the ADC conversion, but youll also have to make sure that these devices come
with a Python library and theyre [supported in Platypush](https://docs.platypush.tech/en/latest/) (feel free to
open an issue or send a pull request if thats not the case).
Lets briefly analyze an example of the option 1 implementation. Lets suppose that you have an Arduino with a connected
DHT11 temperature and humidity sensor on the PIN 7. You can prepare a sketch that looks like this to send new
measurements over USB to the RaspberryPi in JSON format:
```c
#include <Arduino.h>
#include <dht.h>
#define DHT11_PIN 7
dht DHT;
void setup() {
Serial.begin(9600);
}
void loop() {
int ret = DHT.read11(DHT11_PIN);
if (ret < -1) {
delay(1000);
return;
}
Serial.print("{\"temperature\":");
Serial.print(DHT.temperature);
Serial.print(", \"humidity\":");
Serial.print(DHT.humidity);
Serial.println("}");
delay(1000);
}
```
Install the Platypush serial plugin dependencies:
```shell
[sudo] pip install 'platypush[serial]'
```
Then you can add the following lines into the `~/.config/platypush/config.yaml` file of the RaspberryPi that has the
sensors connected to forward new measurements to the message queue, and store them on your local database. The example
also shows how to tweak polling period, tolerance and thresholds:
```yaml
# Enable the serial plugin and specify
# the path to your Arduino/Esp* device
serial:
device: /dev/ttyUSB0
# Enable the serial sensor backend to
# listen for changes in the metrics
backend.sensor.serial:
# How often we should poll for new data
poll_seconds: 5.0
# Which sensors should be enabled. These are
# the keys in the JSON you'll be sending over serial
enabled_sensors:
- temperature
- humidity
# Specify the tolerance for the metrics. A new
# measurement event will be triggered only if
# the absolute value difference between the value in
# the latest event and the value in the current
# measurement is higher than these thresholds.
# If no tolerance value is set for a specific metric
# then new events will be triggered whenever we've
# got new values, as long as they're different from
# the previous, no matter the difference.
tolerance:
temperature: 0.25
humidity: 0.5
# Specify optional thresholds for the metrics. A new
# sensor above/below threshold event will be triggered
# when the value of that metric goes above/below the
# configured threshold.
thresholds:
humidity: 70.0
# You can also specify multiple thresholds values for a metric
temperature:
- 20.0
- 25.0
- 30.0
```
[`backend.sensor.serial`](https://docs.platypush.tech/en/latest/platypush/backend/sensor.serial.html) (and, in
general, any sensor backend) will trigger
a [`SensorDataChangeEvent`](https://docs.platypush.tech/en/latest/platypush/events/sensor.html#platypush.message.event.sensor.SensorDataChangeEvent)
when new sensor data is available, and
[`SensorDataBelowThresholdEvent`](https://docs.platypush.tech/en/latest/platypush/events/sensor.html#platypush.message.event.sensor.SensorDataBelowThresholdEvent) /
[`SensorDataAboveThresholdEvent`](https://docs.platypush.tech/en/latest/platypush/events/sensor.html#platypush.message.event.sensor.SensorDataAboveThresholdEvent)
respectively when the new sensor data is respectively below or above one of the configured threshold.
We can now configure an event hook to send new sensor data to MQTT to be stored on the database by dropping another
script into `~/.config/platypush/scripts`:
```python
from platypush.config import Config
from platypush.event.hook import hook
from platypush.utils import run
from platypush.message.event.sensor import SensorDataChangeEvent
@hook(SensorDataChangeEvent)
def on_sensor_data(event, **context):
hostname = Config.get('device_id')
for metric in ['temperature', 'humidity']:
if 'temperature' in event.data:
run('mqtt.publish', topic=f'sensors/{hostname}/{metric}',
host='your-mqtt-server', port=1883, msg=event.data[metric])
```
Just remember to add `sensors/your-rpi/temperature`, `sensors/your-rpi/humidity` and any other MQTT topic that you want
to monitor to the list of topics watched by `backend.mqtt` on the MQTT/database host.
You can also trigger actions when some sensor data goes above or below a configured threshold - for instance,
turn on/off the lights if the luminosity sensor goes below/above threshold, or turn on/off the fan if the temperature
sensor goes above/below a certain threshold:
```python
from platypush.event.hook import hook
from platypush.utils import run
from platypush.message.event.sensor import \
SensorDataAboveThresholdEvent, SensorDataBelowThresholdEvent
@hook(SensorDataAboveThresholdEvent)
def on_sensor_data(event, **context):
if 'luminosity' in event.data:
run('light.hue.off')
if 'temperature' in event.data:
run('switch.tplink.on', device='Fan')
@hook(SensorDataBelowThresholdEvent)
def on_sensor_data(event, **context):
if 'luminosity' in event.data:
run('light.hue.on')
if 'temperature' in event.data:
run('switch.tplink.off', device='Fan')
```
This logic isn't limited to sensor events sent over a serial interface like an Arduino or ESP8266. If you have sensors
that communicate over e.g. Zigbee, Z-Wave or Bluetooth, you can also configure them in Platypush through the respective
backends and react to their events.
## Smartphone and location data
Our smartphones also generate a lot of data that would be nice to track on our new data infrastructure and automate to
make our lives easier. Well show in this example how to leverage [Tasker](https://tasker.joaoapps.com/),
[Pushbullet](https://www.pushbullet.com/) and [AutoLocation](https://joaoapps.com/autolocation/) on your Android
device to regularly check your location, store it on your local database (so you can turn off the creepy Googles
location history — sorry, Google) and implement smart rules such as turning on lighting and heating and saying a welcome
message when you arrive home.
Lets first see how to store your phone location data to your local database.
- Install the Pushbullet, Tasker and AutoLocation apps on your phone.
- Head to your Pushbullet account settings page and create a new access token.
- Enable the Pushbullet backend on the platypush installation on your database host. Lines to add to your `config.yaml`:
```yaml
backend.pushbullet:
token: your-token
device: platypush
```
- Add a table to your database to store location data:
```sql
CREATE TABLE location_history (
id serial NOT NULL,
latitude double precision,
longitude double precision,
altitude double precision,
created_at timestamp with time zone DEFAULT CURRENT_TIMESTAMP,
primary key(id)
);
```
- Create an event hook that listens for specifically formatted Pushbullet notes (e.g`. LATLNG#7.8712,57.3123`) that
contains lat/long information and stores them on your PostgreSQL database:
```python
from platypush.event.hook import hook
from platypush.utils import run
from platypush.message.event.pushbullet import PushbulletEvent
db_engine = 'postgresql+pg8000://pi:your-password@localhost/sensors'
@hook(PushbulletEvent)
def on_push_data(event, **context):
if event.body.startswith('LATLNG'):
run('db.insert', engine=db_engine, table='location_history',
records=[{
'latitude': event.body[len('LATLNG#'):].split(',')[0],
'longitude': event.body[len('LATLNG#'):].split(',')[1],
}]
)
```
You can also configure Tasker to use a 3rd-party app to send messages directly to MQTT (like *Join* or *MQTT Client*)
so you don't have to stuff data into Pushbullet notes, but for the purpose of this example we'll analyze the Pushbullet
way because it's easier to set up.
- Create a Tasker task that runs every 10 minutes (or 5, or 20, or however often you like) to update your location by
sending a Pushbullet note to your Platypush virtual device:
![Tasker create new task](../img/tasker-screen-1.jpeg)
![Tasker create new task](../img/tasker-screen-2.jpeg)
![Tasker create new task](../img/tasker-screen-3.jpeg)
After saving the Tasker profile your smartphone will start sending its location data at regular intervals to Pushbullet,
your RaspberryPi will intercept those notifications and store the data on your local database. Time to ditch third-party
location trackers for good!
How about running custom actions when you enter or exit your home area? Lets create a Tasker profile that, based on
AutoLocation lat/long data, detects when you enter or exit a certain area.
![Tasker create new task](../img/tasker-screen-4.jpeg)
The task will simply send a Pushbullet note to your Platypush virtual device that contains `HOME#1` (you entered your
home area) or `HOME#0` (you exited your home area).
![Tasker create new task](../img/tasker-screen-5.jpeg)
![Tasker create new task](../img/tasker-screen-6.jpeg)
Add an event hook to intercept the notification and run your custom logic:
```python
from platypush.context import get_plugin
from platypush.event.hook import hook
from platypush.message.event.pushbullet import PushbulletEvent
@hook(PushbulletEvent)
def on_home_push_data(event, **context):
if not event.body.startswith('HOME#'):
return
# Initialize the plugins
# Note that get_plugin('plugin').method() and
# run('plugin.method') can be used interexchangably
variable = get_plugin('variable')
lights = get_plugin('light.hue')
music = get_plugin('music.mpd')
tts = get_plugin('tts')
# Get the AT_HOME status
at_home = int(event.body.split('#')[1])
# Get the previous AT_HOME status
prev_at_home = int(variable.get('AT_HOME').get('AT_HOME', 0))
if at_home and not prev_at_home:
# Example: turn on the lights, start playing the music and
# say a welcome message
lights.on()
tts.say(text='Welcome home')
music.play()
elif not at_home and prev_at_home:
# Turn off the lights and stop the music
lights.off()
music.stop()
# Store the new AT_HOME status
variable.set(AT_HOME=at_home)
```
With the simple ingredients shown so far, it is relatively straightforward to connect events on your phone to your smart
home infrastructure, as long as youve got a Tasker plugin on your smartphone for achieving what you want to do.
## Fit data
The explosion of smartwatches, fit trackers, body sensors and smart fit algorithms running on our phones in the last
years has opened the gates to an authentic revolution for health and fit technologies. However, such a revolution is
still failing to reach its full potential because of the fragmentation of the market and the limited possibilities when
it comes to visualize and query data. Most of the health and fit solutions come with their walled garden app: you can
only access the data using the app provided by the developer, and you can only use that app to access the data generated
by your specific sensors. The lack of integration between solutions has turned what could be a revolution in the way we
measure the data generated by our bodies in cool pieces of tech that we like to show to friends without much practical
utility. In the last years, some steps forward have been done by Google Fit; more and more products nowadays can
synchronize their data to Google Fit (and my advice is to steer clear of those who dont: theyre nothing more but shiny
toys with no practical utility). However, although Google Fit allows you to have a single view on your body data even if
the data points are collected by different sensors, its still very limited when it comes to providing you with a
powerful way to query, compare and visualize your data. The web service has been killed a while ago, and that means that
the only way to access your data is through the (frankly very limited) mobile app. And youve got no way to perform more
advanced queries, such as comparisons of data between different periods, finding the day during the month when youve
walked or slept the most, or even just visualizing the data on a computer unless you make your own program leveraging
the Fit API.
Luckily, Platypush comes with a handy Google Fit
[backend](https://docs.platypush.tech/en/latest/platypush/backend/google.fit.html)
and [plugin](https://docs.platypush.tech/en/latest/platypush/plugins/google.fit.html), and you can
leverage them to easily build your visualization, automation and queriable fit database.
- Prepare the fit tables on your database. Again, well leverage a trigger to take care of the normalization:
```sql
--
-- tmp_fit_data table setup
--
drop sequence if exists tmp_fit_data_seq cascade;
create sequence tmp_fit_data_seq;
drop table if exists tmp_fit_data cascade;
create table tmp_fit_data(
id integer not null default nextval('tmp_fit_data_seq'),
username varchar(255) not null default 'me',
data_source varchar(1024) not null,
orig_data_source varchar(1024),
data_type varchar(255) not null,
value float,
json_value jsonb,
start_time timestamp with time zone not null,
end_time timestamp with time zone not null,
primary key(id)
);
alter sequence tmp_fit_data_seq owned by tmp_fit_data.id;
--
-- fit_user table setup
--
drop sequence if exists fit_user_seq cascade;
create sequence fit_user_seq;
drop table if exists fit_user cascade;
create table fit_user(
id integer not null default nextval('fit_user_seq'),
name varchar(255) unique not null,
primary key(id)
);
alter sequence fit_user_seq owned by fit_user.id;
--
-- fit_data_source table setup
--
drop sequence if exists fit_data_source_seq cascade;
create sequence fit_data_source_seq;
drop table if exists fit_data_source cascade;
create table fit_data_source(
id integer not null default nextval('fit_data_source_seq'),
name varchar(255) unique not null,
primary key(id)
);
alter sequence fit_data_source_seq owned by fit_data_source.id;
--
-- fit_data_type table setup
--
drop sequence if exists fit_data_type_seq cascade;
create sequence fit_data_type_seq;
drop table if exists fit_data_type cascade;
create table fit_data_type(
id integer not null default nextval('fit_data_type_seq'),
name varchar(255) unique not null,
primary key(id)
);
alter sequence fit_data_type_seq owned by fit_data_type.id;
--
-- fit_data table setup
--
drop sequence if exists fit_data_seq cascade;
create sequence fit_data_seq;
drop table if exists fit_data cascade;
create table fit_data(
id integer not null default nextval('fit_data_seq'),
user_id integer not null,
data_source_id integer not null,
orig_data_source_id integer,
data_type_id integer not null,
value float,
json_value jsonb,
start_time timestamp with time zone not null,
end_time timestamp with time zone not null,
primary key(id),
foreign key(user_id) references fit_user(id),
foreign key(data_source_id) references fit_data_source(id),
foreign key(orig_data_source_id) references fit_data_source(id),
foreign key(data_type_id) references fit_data_type(id)
);
alter sequence fit_data_seq owned by fit_data.id;
--
-- Sync fit_data table trigger setup
--
create or replace function sync_fit_data()
returns trigger as
$$
begin
insert into fit_user(name) values(new.username)
on conflict do nothing;
insert into fit_data_source(name) values(new.data_source)
on conflict do nothing;
insert into fit_data_source(name) values(new.orig_data_source)
on conflict do nothing;
insert into fit_data_type(name) values(new.data_type)
on conflict do nothing;
insert into fit_data(user_id, data_source_id, orig_data_source_id, data_type_id, value, json_value, start_time, end_time) values(
(select id from fit_user u where u.name = new.username),
(select id from fit_data_source ds where ds.name = new.data_source),
(select id from fit_data_source ds where ds.name = new.orig_data_source),
(select id from fit_data_type dt where dt.name = new.data_type),
new.value, new.json_value, new.start_time, new.end_time
);
delete from tmp_fit_data where id = new.id;
return new;
end;
$$
language 'plpgsql';
drop trigger if exists on_tmp_fit_data_insert on tmp_fit_data;
create trigger on_tmp_fit_data_insert
after insert on tmp_fit_data
for each row
execute procedure sync_fit_data();
--
-- vfit view definition
drop view if exists vfit;
create view vfit as
select d.id
, u.name as username
, ds.name as data_source
, ods.name as orig_data_source
, dt.name as data_type
, value
, json_value
, start_time
, end_time
from fit_data d
join fit_user u on d.user_id = u.id
join fit_data_source ds on d.data_source_id = ds.id
left join fit_data_source ods on d.orig_data_source_id = ods.id
join fit_data_type dt on d.data_type_id = dt.id;
```
- Head to the [Google developers console](https://console.developers.google.com/) and get your credentials JSON file:
![Google Fit developers console](../img/google-fit-1.png)
- Run the following command to authorize platypush to access your Fit data:
```shell
python -m platypush.plugins.google.credentials \
"https://www.googleapis.com/auth/fitness.activity.read
https://www.googleapis.com/auth/fitness.body.read
https://www.googleapis.com/auth/fitness.body_temperature.read
https://www.googleapis.com/auth/fitness.location.read" \
/path/to/your/credentials.json \
--noauth_local_webserver
```
- With Platypush running, check the data sources that are available on your account:
```shell
curl -XPOST \
-H "Authorization: Bearer $PP_TOKEN" \
-H 'Content-Type: application/json' -d '
{
"type":"request",
"action":"google.fit.get_data_sources"
}' http://your-pi:8008/execute
```
- Take note of the `dataStreamId` attributes of the metrics that you want to monitor and add them to the configuration
of the Google Fit backend:
```yaml
backend.google.fit:
poll_seconds: 1800
data_sources:
- derived:com.google.weight:com.google.android.gms:merge_weight
- derived:com.google.calories.bmr:com.google.android.gms:merged
- derived:com.google.distance.delta:com.google.android.gms:platform_distance_delta
- derived:com.google.speed:com.google.android.gms:merge_speed
- derived:com.google.step_count.delta:com.google.android.gms:merge_step_deltas
- derived:com.google.heart_rate.bpm:com.google.android.gms:merge_heart_rate_bpm
- derived:com.google.calories.expended:com.google.android.gms:from_activities
- derived:com.google.calories.expended:com.google.android.gms:from_bmr
- derived:com.google.calories.expended:com.google.android.gms:platform_calories_expended
- derived:com.google.activity.segment:com.google.android.gms:platform_activity_segments
- derived:com.google.activity.segment:com.google.android.gms:merge_activity_segments
- derived:com.google.activity.segment:com.google.android.gms:session_activity_segment
- derived:com.google.active_minutes:com.google.android.gms:merge_active_minutes
```
- Finally, create an event hook that inserts new data into your newly created tables:
```python
import datetime
import json
from platypush.event.hook import hook
from platypush.utils import run
from platypush.message.event.google.fit import GoogleFitEvent
db_engine = 'postgresql+pg8000://pi:your-password@localhost/sensors'
@hook(GoogleFitEvent)
def on_home_push_data(event, **context):
run('db.insert', engine=db_engine, table='tmp_fit_data',
records=[{
'username': event.user_id,
'data_source': event.data_source_id,
'orig_data_source': event.origin_data_source_id,
'data_type': event.data_type,
'value': event.values[0],
'json_value': json.dumps(event.values),
'start_time': datetime.datetime.fromtimestamp(event.start_time),
'end_time': datetime.datetime.fromtimestamp(event.end_time),
}]
)
```
- Restart Platypush. You should soon start to see your fit data populating your tables.
## Data visualization and automatic alerting
So now youve built your data pipeline to deliver system, sensor, mobile and fit data points to your local database and
build automation on those events. But we all know that data collection is only half fun if we cant visualize that data.
Time to head to the Grafana dashboard weve installed and create some graphs!
You can install Grafana on Debian/Ubuntu/Raspbian/RaspberryPi OS by adding the Grafana repository to your apt sources:
```shell
wget -q -O - https://packages.grafana.com/gpg.key | sudo apt-key add -
echo "deb https://packages.grafana.com/oss/deb stable main" | sudo tee -a /etc/apt/sources.list.d/grafana.list
[sudo] apt-get update
[sudo] apt-get install -y grafana
# Enable and start the service
[sudo] systemctl enable grafana-server
[sudo] systemctl start grafana-server
```
On Arch Linux the `grafana` package is instead provided in the default repository.
Open `http://your-pi-address:3000/` in your browser, create an admin user and add your database to the configuration as
a PostgreSQL source.
Creating dashboards and panels in Grafana is really straightforward. All you need is to specify the visualization type
and the query that you want to run against your database. A simple panel that displays the steps walked per day and the
active time would look like this:
![Grafana view](../img/grafana-1.png)
Grafana also allows you to create alerts when some metrics go below/above a certain threshold or when there are no data
points for a certain period of time. You can also connect such alerts back to platypush events by leveraging Platypushs
[web hooks](https://docs.platypush.tech/en/latest/platypush/events/http.hook.html).
Lets see for example how to configure Grafana to send a notification to a Platypush custom web hook that sends a
Pushbullet notification to your mobile device when the measurements from one of your gas sensors go above a certain
threshold:
- Add a web hook script:
```python
from platypush.event.hook import hook
from platypush.utils import run
from platypush.message.event.http.hook import WebhookEvent
@hook(WebhookEvent, hook='gas_alert')
def on_gas_alert(event, **context):
if event.state != 'ok':
run('pushbullet.send_note', title=event.title,
body='High concentration of gas detected!')
```
This configuration will create a dynamic web hook that can be accessed through `http://your-pi:8008/hook/gas_alert`.
- Go to your Grafana dashboard, click on “Alerting” (bell icon on the right) -> Notifications channels and add your web
hook:
![Grafana view](../img/grafana-2.png)
- Edit the panel that contains your gas sensor measurements, click on the bell icon and add an automatic alert whenever
the value goes above a certain threshold:
![Grafana view](../img/grafana-3.png)
Youll receive a Pushbullet notification on your mobile device whenever there is an alert associated with your metric.
If youve read the article so far you should have all the ingredients in place to do anything you want with your own
data. This article tries its best to show useful examples but isnt intended to be an exhaustive guide to everything
that you can do by connecting a database, a data pipeline and an event and automation engine. I hope that I have
provided you with enough inputs to stimulate your creativity and build something new :)