platypush/platypush/backend/google/fit/__init__.py

130 lines
4.5 KiB
Python

import datetime
import time
from platypush.backend import Backend
from platypush.context import get_plugin
from platypush.message.event.google.fit import GoogleFitEvent
from platypush.utils import camel_case_to_snake_case
class GoogleFitBackend(Backend):
"""
This backend will listen for new Google Fit events (e.g. new weight/height
measurements, new fitness activities etc.) on the specified data streams and
fire an event upon new data.
Requires:
* The **google.fit** plugin
(:class:`platypush.plugins.google.fit.GoogleFitPlugin`) enabled.
"""
_default_poll_seconds = 60
_default_user_id = 'me'
_last_timestamp_varname = '_GOOGLE_FIT_LAST_TIMESTAMP_'
def __init__(
self,
data_sources,
user_id=_default_user_id,
poll_seconds=_default_poll_seconds,
*args,
**kwargs
):
"""
:param data_sources: Google Fit data source IDs to monitor. You can
get a list of the available data sources through the
:meth:`platypush.plugins.google.fit.GoogleFitPlugin.get_data_sources`
action
:type data_sources: list[str]
:param user_id: Google user ID to track (default: 'me')
:type user_id: str
:param poll_seconds: How often the backend will query the data sources
for new data points (default: 60 seconds)
:type poll_seconds: float
"""
super().__init__(*args, **kwargs)
self.data_sources = data_sources
self.user_id = user_id
self.poll_seconds = poll_seconds
def run(self):
super().run()
self.logger.info(
'Started Google Fit backend on data sources {}'.format(self.data_sources)
)
while not self.should_stop():
try:
for data_source in self.data_sources:
varname = self._last_timestamp_varname + data_source
last_timestamp = float(
get_plugin('variable').get(varname).output.get(varname) or 0
)
new_last_timestamp = last_timestamp
self.logger.info(
'Processing new entries from data source {}, last timestamp: {}'.format(
data_source,
str(datetime.datetime.fromtimestamp(last_timestamp)),
)
)
data_points = (
get_plugin('google.fit')
.get_data(user_id=self.user_id, data_source_id=data_source)
.output
)
new_data_points = 0
for dp in data_points:
dp_time = dp.pop('startTime', 0)
if 'dataSourceId' in dp:
del dp['dataSourceId']
if dp_time > last_timestamp:
self.bus.post(
GoogleFitEvent(
user_id=self.user_id,
data_source_id=data_source,
data_type=dp.pop('dataTypeName'),
start_time=dp_time,
end_time=dp.pop('endTime'),
modified_time=dp.pop('modifiedTime'),
values=dp.pop('values'),
**{
camel_case_to_snake_case(k): v
for k, v in dp.items()
}
)
)
new_data_points += 1
new_last_timestamp = max(dp_time, new_last_timestamp)
last_timestamp = new_last_timestamp
self.logger.info(
'Got {} new entries from data source {}, last timestamp: {}'.format(
new_data_points,
data_source,
str(datetime.datetime.fromtimestamp(last_timestamp)),
)
)
get_plugin('variable').set(**{varname: last_timestamp})
except Exception as e:
self.logger.warning('Exception while processing Fit data')
self.logger.exception(e)
continue
time.sleep(self.poll_seconds)
# vim:sw=4:ts=4:et: