blog/static/pages/How-to-build-your-personal-infrastructure-for-data-collection-and-visualization.md

13 KiB
Raw Blame History

A smart home can generate and collect data. Lots of data. And there are currently a few outstanding issues with home-generated data:

  • Fragmentation. You probably have your home weather station, your own motion detectors, security cameras, gas and smoke detectors, body sensors, GPS and fit trackers and smart plugs around. Its quite likely that most of these devices generates data, and that such data will in most of the cases be only accessible through a proprietary app or web service, and that any integration with other services, or any room for tinkering and automation purposes, will mostly depend on the benevolence of the developer or business in building third-party interfaces for such data. In this article, well explore how, thanks to open source solutions like platypush, Grafana and Mosquitto, its possible to overcome the fragmentation issue and “glue” together data sources that wouldnt otherwise be able to communicate nor share data.

  • Ability to query. Most of the hardware and data geeks out there wont settle with the ability to access their data through a gauge in an app or a timeline graph. Many of us want the ability to explore our own generated data in a structured way, preferably through SQL, or any query language, and we demand tailor-made dashboards to explore our data, not dumb mobile apps. The ability to generate custom monthly reports of our fit activities, query the countries where weve been in a certain range of time, or how much time we spent indoor in the past three months, or how many times the smoke detectors in our guest room went above threshold in the past week, is, for many of us, priceless, and often neglected by hardware and software makers. In this article well explore how to leverage an open-source relational database (PostgreSQL in this example) and some elementary data pipelines to dispatch and store your data on your private computers, ready to be queried or visualized however you like.

  • Privacy. Many of the solutions or services Ive mentioned in the previous examples come with their cloud-based infrastructure to store user data. While storing your data on somebody elses computers saves you the time and disk space required to invest in your local solution, it also comes with all the concerns related to — ehm — storing your data on somebody elses computer. That somebody else can decide if and how you can access your data, can decide to sell your data for profit, or can be hacked in a way or another. This can be especially worrisome if were talking data about your own body, location or house environment. A house-hosted data infrastructure bypasses the issue with third-party ownership of your data.

This article will analyze the building blocks to set up your data infrastructure and build automation on it. Well see how to set up data collection and monitoring for a few use cases (temperature, humidity, gas, phone location and fit data) and how to build automation triggers based on such data.

Dependencies setup

First, youll need a RaspberryPi (or any similar clone) with Platypush. I assume that youve already got Platypush installed and configured. If not, please head to my my previous article on getting started with Platypush.

Youll also need a relational database installed on your device. The example in this article will rely on PostgreSQL, but any relational database will do its job. To install and configure PostgreSQL on Raspbian and create a database named sensors:

[sudo] apt-get install postgresql libpq-dev postgresql-client
postgresql-client-common -y
[sudo] systemctl restart postgresql.service
[sudo] su postgres
createuser pi -P --interactive
psql -U pi
> create database sensors;

Well use the database to store the following information:

  • System metrics
  • Sensors data
  • Smartphone and location data
  • Fit data

Youll also need a message queue broker running on your RaspberryPi to dispatch messages with new data reads — check this Instructables tutorial on how to get Mosquitto up and running on your RaspberryPi.

For some of the data measurements, well also need an MQTT client to test messages over the configured queue — for example, to send measurements from a shell script. I like to use mqttcli for these purposes — its fast, lightweight and written in Go:

go get github.com/shirou/mqttcli

Finally, install Grafana as a web-based gateway to visualize your data:

[sudo] apt-get install grafana
[sudo] systemctl restart grafana

After starting the service head to http://your-pi:3000 and make sure that you see the Grafana splash screen — create a new admin user and youre good to go for now.

Now that youve got all the fundamental pieces in place its time to set up your data collection pipeline and dashboard. Lets start from setting up the tables and data storage logic on your database.

Database configuration

If you followed the instructions above then youll have a PostgreSQL instance running on your RaspberryPi, accessible through the user pi, and a sensors database created for the purpose. In this section, Ill explain how to create the basic tables and the triggers to normalize the data. Keep in mind that your measurement tables might become quite large, depending on how much data you process and how often you process it. Its relatively important, to keep database size under control and to make queries efficient, to provide normalized tables structures enforced by triggers. Ive prepared the following provisioning script for my purposes:

-- Temporary sensors table where we store the raw
-- measurements as received on the message queue

drop table if exists tmp_sensors cascade;
create table tmp_sensors(
    id          serial not null,
    host        varchar(64) not null,
    metric      varchar(255) not null,
    data        double precision,
    created_at  timestamp with time zone default CURRENT_TIMESTAMP,
    primary key(id)
);

-- Table to store the hosts associated to the data points

drop table if exists sensor_host cascade;
create table sensor_host(
    id          serial not null,
    host        varchar(64) unique not null,
    primary key(id)
);

-- Table to store the metrics

drop table if exists sensor_metric cascade;
create table sensor_metric(
    id          serial not null,
    metric      varchar(255) unique not null,
    primary key(id)
);

-- Table to store the normalized data points

drop table if exists sensor_data cascade;
create table sensor_data(
    id          serial not null,
    host_id     integer not null,
    metric_id   integer not null,
    data        double precision,
    created_at  timestamp with time zone default CURRENT_TIMESTAMP,

    primary key(id),
    foreign key(host_id) references sensor_host(id),
    foreign key(metric_id) references sensor_metric(id)
);

-- Define a stored procedure that normalizes new rows on tmp_sensors
-- by either creating or returning the associated host_id and metric_id,
-- creating a normalized representation of the row on sensor_data and
-- delete the original raw entry on tmp_sensors.

create or replace function sync_sensors_data()
    returns trigger as
$$
begin
    insert into sensor_host(host) values(new.host)
        on conflict do nothing;

    insert into sensor_metric(metric) values(new.metric)
        on conflict do nothing;

    insert into sensor_data(host_id, metric_id, data) values(
        (select id from sensor_host where host = new.host),
        (select id from sensor_metric where metric = new.metric),
        new.data
    );

    delete from tmp_sensors where id = new.id;
    return new;
end;
$$
language 'plpgsql';

-- Create a trigger that invokes the store procedure defined above
-- after a row is inserted on tmp_sensors

drop trigger if exists on_sensor_data_insert on tmp_sensors;

create trigger on_sensor_data_insert
    after insert on tmp_sensors
    for each row
    execute procedure sync_sensors_data();
    
create view public.vsensors AS
 select d.id AS data_id,
    h.host,
    m.metric,
    d.data,
    d.created_at
   from ((public.sensor_data d
     join public.sensor_host h ON ((d.host_id = h.id)))
     join public.sensor_metric m ON ((d.metric_id = m.id)));

The script above will keep the data on your database normalized and query-friendly even if the messages pushed on the message queue dont care about which is the right numeric host_id or metric_id. Run it against your PostgreSQL instance:

psql -U pi < database_provisioning.sql

Now that youve got the tables ready its time to fill them with data. Well see a few examples of metrics collection, starting with system metrics.

System metrics

You may want to monitor the CPU, RAM or disk usage of your own RaspberryPi or any other host or virtual server youve got around, do things like setting up a dashboard to easily monitor your metrics or set up alerts in case something goes out of control.

First, create a script that checks the memory available on your system and sends the percentage of used memory on a message queue channel — well store this script under ~/bin/send_mem_stats.sh for the purposes of this tutorial:

#!/bin/bash

total=$(free -m | head -2 | tail -1 | awk '{print $2}')
avail=$(free -m | head -2 | tail -1 | awk '{print $7}')
let used=$total-$avail

export MQTT_HOST=your-mqtt-server
export MQTT_PORT=1883

$HOME/go/bin/mqttcli pub -t "sensors/$(hostname)/memory" -m $used

And schedule it to run every e.g. 5 minutes in your crontab:

*/5 * * * * /bin/bash /home/pi/bin/send_mem_stats.sh

Similar scripts can be made also for other system stats, for example to monitor the root disk usage:

#!/bin/bash

usage=$(df -h | egrep '/$' | awk '{print $5}' | tr -d % | awk '{printf "%.2f", ($0/100)}')

export MQTT_HOST=your-mqtt-server
export MQTT_PORT=1883

$HOME/go/bin/mqttcli pub -t "sensors/$(hostname)/disk_root" -m $usage

Alternatively, you can also write the logic for sending system data points directly in Platypush - e.g. using the system plugin - and that will be executed while the service is running, so you won't need the mqttcli dependency:

from datetime import datetime

from platypush.config import Config
from platypush.cron import cron
from platypush.utils import run


@cron('*/5 * * * *')
def send_memory_stats(**context):
    mem = run('system.mem_virtual')
    run('mqtt.publish', host='your-mqtt-server', port=1883,
        topic=f'sensors/{Config.get("device_id")}/memory',
        msg=mem['percent'])

You can extend this pattern to any sensor data you want to send over the queue.

Once scheduled these jobs will start pushing data to your message queue, on the configured topic (in the examples above respectively to sensors/<hostname>/memory and sensors/<hostname>/disk_root) at regular intervals.

Its now time to set up Platypush to listen on those channels and whenever a new message comes in store it in the database you have provisioned. Add the following configuration to your ~/.config/platypush/config.yaml file:

# Enable the MQTT backend
backend.mqtt:
    host: your-mqtt-server
    port: 1883

    # Configure platypush to listen for new messages on these topics
    listeners:
        - host: your-mqtt-server
          topics:
              - sensors/host1/disk_root
              - sensors/host2/disk_root
              - sensors/host1/memory
              - sensors/host2/memory

And create an event hook (e.g. under ~/.config/platypush/scripts/mqtt.py) that stores the messages received on some specified channels to your database:

from platypush.event.hook import hook
from platypush.utils import run

from platypush.message.event.mqtt import MQTTMessageEvent

db_engine = 'postgresql+pg8000://pi:your-password@localhost/sensors'


@hook(MQTTMessageEvent)
def on_mqtt_message(event, **context):
    if not event.topic.startswith('sensors/')
        return

    (prefix, host, metric) = event.topic.split('/')
    run('db.insert',
        engine=db_engine,
        table='tmp_sensors',
        records=[{
            'host': host,
            'metric': metric,
            'data': event.msg,
        }]
    )

By inserting the data into tmp_sensors we make sure that the triggers that we previously declared on the database will be executed and data will be normalized. Start Platypush, and if everything went smooth youll soon see your sensor_data table getting populated with memory and disk usage stats.