diff --git a/docs/source/backends.rst b/docs/source/backends.rst index c209095c6..3a152791e 100644 --- a/docs/source/backends.rst +++ b/docs/source/backends.rst @@ -10,7 +10,6 @@ Backends platypush/backend/button.flic.rst platypush/backend/camera.pi.rst platypush/backend/chat.telegram.rst - platypush/backend/google.fit.rst platypush/backend/google.pubsub.rst platypush/backend/gps.rst platypush/backend/http.rst diff --git a/docs/source/platypush/backend/google.fit.rst b/docs/source/platypush/backend/google.fit.rst deleted file mode 100644 index 74da2573e..000000000 --- a/docs/source/platypush/backend/google.fit.rst +++ /dev/null @@ -1,6 +0,0 @@ -``google.fit`` -================================ - -.. automodule:: platypush.backend.google.fit - :members: - diff --git a/platypush/backend/google/fit/__init__.py b/platypush/backend/google/fit/__init__.py deleted file mode 100644 index 30c2ed27f..000000000 --- a/platypush/backend/google/fit/__init__.py +++ /dev/null @@ -1,129 +0,0 @@ -import datetime -import time - -from platypush.backend import Backend -from platypush.context import get_plugin -from platypush.message.event.google.fit import GoogleFitEvent -from platypush.utils import camel_case_to_snake_case - - -class GoogleFitBackend(Backend): - """ - This backend will listen for new Google Fit events (e.g. new weight/height - measurements, new fitness activities etc.) on the specified data streams and - fire an event upon new data. - - Requires: - - * The **google.fit** plugin - (:class:`platypush.plugins.google.fit.GoogleFitPlugin`) enabled. - - """ - - _default_poll_seconds = 60 - _default_user_id = 'me' - _last_timestamp_varname = '_GOOGLE_FIT_LAST_TIMESTAMP_' - - def __init__( - self, - data_sources, - user_id=_default_user_id, - poll_seconds=_default_poll_seconds, - *args, - **kwargs - ): - """ - :param data_sources: Google Fit data source IDs to monitor. You can - get a list of the available data sources through the - :meth:`platypush.plugins.google.fit.GoogleFitPlugin.get_data_sources` - action - :type data_sources: list[str] - - :param user_id: Google user ID to track (default: 'me') - :type user_id: str - - :param poll_seconds: How often the backend will query the data sources - for new data points (default: 60 seconds) - :type poll_seconds: float - """ - - super().__init__(*args, **kwargs) - - self.data_sources = data_sources - self.user_id = user_id - self.poll_seconds = poll_seconds - - def run(self): - super().run() - self.logger.info( - 'Started Google Fit backend on data sources {}'.format(self.data_sources) - ) - - while not self.should_stop(): - try: - for data_source in self.data_sources: - varname = self._last_timestamp_varname + data_source - last_timestamp = float( - get_plugin('variable').get(varname).output.get(varname) or 0 - ) - - new_last_timestamp = last_timestamp - self.logger.info( - 'Processing new entries from data source {}, last timestamp: {}'.format( - data_source, - str(datetime.datetime.fromtimestamp(last_timestamp)), - ) - ) - - data_points = ( - get_plugin('google.fit') - .get_data(user_id=self.user_id, data_source_id=data_source) - .output - ) - new_data_points = 0 - - for dp in data_points: - dp_time = dp.pop('startTime', 0) - if 'dataSourceId' in dp: - del dp['dataSourceId'] - - if dp_time > last_timestamp: - self.bus.post( - GoogleFitEvent( - user_id=self.user_id, - data_source_id=data_source, - data_type=dp.pop('dataTypeName'), - start_time=dp_time, - end_time=dp.pop('endTime'), - modified_time=dp.pop('modifiedTime'), - values=dp.pop('values'), - **{ - camel_case_to_snake_case(k): v - for k, v in dp.items() - } - ) - ) - - new_data_points += 1 - - new_last_timestamp = max(dp_time, new_last_timestamp) - - last_timestamp = new_last_timestamp - self.logger.info( - 'Got {} new entries from data source {}, last timestamp: {}'.format( - new_data_points, - data_source, - str(datetime.datetime.fromtimestamp(last_timestamp)), - ) - ) - - get_plugin('variable').set(**{varname: last_timestamp}) - except Exception as e: - self.logger.warning('Exception while processing Fit data') - self.logger.exception(e) - continue - - time.sleep(self.poll_seconds) - - -# vim:sw=4:ts=4:et: diff --git a/platypush/backend/google/fit/manifest.yaml b/platypush/backend/google/fit/manifest.yaml deleted file mode 100644 index 7a5b3ef0e..000000000 --- a/platypush/backend/google/fit/manifest.yaml +++ /dev/null @@ -1,8 +0,0 @@ -manifest: - events: - platypush.message.event.google.fit.GoogleFitEvent: when a newdata point is received - on one of the registered streams. - install: - pip: [] - package: platypush.backend.google.fit - type: backend diff --git a/platypush/plugins/google/fit/__init__.py b/platypush/plugins/google/fit/__init__.py index a4610688b..fd300e1b5 100644 --- a/platypush/plugins/google/fit/__init__.py +++ b/platypush/plugins/google/fit/__init__.py @@ -1,8 +1,14 @@ -from platypush.plugins import action +from datetime import datetime +from typing import Generator, Iterable, Optional + +from platypush.context import Variable +from platypush.message.event.google.fit import GoogleFitEvent +from platypush.plugins import RunnablePlugin, action from platypush.plugins.google import GooglePlugin +from platypush.utils import camel_case_to_snake_case -class GoogleFitPlugin(GooglePlugin): +class GoogleFitPlugin(GooglePlugin, RunnablePlugin): r""" Google Fit plugin. @@ -63,6 +69,38 @@ class GoogleFitPlugin(GooglePlugin): script. Otherwise, if you're running the script on a desktop, a browser window will be opened automatically. + When not configured with any ``data_sources``, the plugin won't start the + event monitor and won't trigger any + :class:`platypush.message.event.google.fit.GoogleFitEvent`. You can still + fetch data points programmatically through the :meth:`.get_data` though. + + If you want to monitor data sources and fire events, then you need to + explicitly define which health metrics you want to monitor through this + integration. + + After starting the plugin, you can get a list of the available data sources + by running the :meth:`.get_data_sources` action. The ``dataStreamId`` + fields are usually the ones you want to configure in your data sources. + + Unless you are interested in monitoring data points from specific devices, + you may want to look for ``dataStreamId`` fields that match the + ``derived:*:merge*`` pattern. Some popular examples include:e.g. + + - ``derived:com.google.step_count.delta:merge_step_deltas``, to monitor + the number of steps taken in a given time interval. + + - ``derived:com.google.active_minutes:com.google.android.gms:merge_active_minutes``, + to monitor the number of active minutes in a given time interval. + + - ``derived:com.google.speed:com.google.android.gms:merge_speed``, to + monitor the speed in a given time interval. + + - ``derived:com.google.calories.expended:com.google.android.gms:merge_calories_expended``, + to monitor the number of calories burned in a given time interval. + + - ``derived:com.google.heart_rate.bpm:com.google.android.gms:merge_heart_rate_bpm``, + to monitor the heart rate measured in a given time interval. + """ scopes = [ @@ -74,31 +112,78 @@ class GoogleFitPlugin(GooglePlugin): 'https://www.googleapis.com/auth/fitness.location.read', ] - def __init__(self, user_id='me', *args, **kwargs): + _default_user_id = 'me' + _last_timestamp_varname = '_GOOGLE_FIT_LAST_TIMESTAMP_' + + def __init__( + self, + data_sources: Iterable[str] = (), + user_id: str = _default_user_id, + poll_interval: float = 120.0, + **kwargs, + ): """ + :param data_sources: Google Fit data source IDs to monitor - e.g. + weight, heartbeat, steps etc. You can get a list of the available + data sources on your account through the :meth:`.get_data_sources` + action. If none is specified then no sources will be monitored. :param user_id: Default Google user_id (default: 'me', default configured account user) - :type user_id: str or int + :param poll_interval: How often the plugin should poll for new events + (default: 120 seconds). """ - super().__init__(scopes=self.scopes, *args, **kwargs) + super().__init__( + scopes=self.scopes, + poll_interval=poll_interval, + disable_monitor=not data_sources, + **kwargs, + ) + self.user_id = user_id + self.data_sources = data_sources + + def _last_timestamp_var(self, data_source: str) -> Variable: + return Variable(self._last_timestamp_varname + data_source) + + def _get_last_timestamp(self, data_source: str) -> float: + return float(self._last_timestamp_var(data_source).get() or 0) + + def _set_last_timestamp(self, data_source: str, timestamp: float): + self._last_timestamp_var(data_source).set(timestamp) @action - def get_data_sources(self, user_id=None): + def get_data_sources(self, user_id: Optional[str] = None): """ - Get the available data sources for the specified user_id + Get the available data sources for the specified user. + + :param user_id: Target user ID (default: configured user). """ + from googleapiclient.errors import HttpError service = self.get_service(service='fitness', version='v1') - sources = ( - service.users().dataSources().list(userId=user_id or self.user_id).execute() - ) + + try: + sources = ( + service.users() # pylint: disable=no-member + .dataSources() + .list(userId=user_id or self.user_id) + .execute() + ) + except HttpError as e: + err = f'Error while getting data sources: {e.status_code}: {e.reason}' + self.logger.warning(err) + raise AssertionError(err) from e return sources['dataSource'] @action - def get_data(self, data_source_id, user_id=None, limit=None): + def get_data( + self, + data_source_id: str, + user_id: Optional[str] = None, + limit: Optional[int] = 100, + ): """ Get raw data for the specified data_source_id @@ -107,6 +192,30 @@ class GoogleFitPlugin(GooglePlugin): :param user_id: Target user ID (default: configured user). :param limit: Maximum number of items to return. """ + return list( + self._get_data(data_source_id=data_source_id, user_id=user_id, limit=limit) + ) + + def _get_timestamp(self, dp: dict, prefix: str) -> Optional[float]: + basename = name = prefix + 'Time' + t = dp.pop(name, None) + if t is not None: + return float(t) + + name = basename + 'Millis' + t = dp.pop(name, None) + if t is not None: + return float(t) / 1e3 + + name = basename + 'Nanos' + t = dp.pop(name, None) + if t is not None: + return float(t) / 1e9 + + def _get_data( + self, data_source_id, user_id: Optional[str] = None, limit: Optional[int] = 100 + ) -> Generator[dict, None, None]: + from googleapiclient.errors import HttpError service = self.get_service(service='fitness', version='v1') kwargs = { @@ -116,45 +225,106 @@ class GoogleFitPlugin(GooglePlugin): if limit: kwargs['limit'] = limit - data_points = [] - for data_point in ( - service.users() - .dataSources() - .dataPointChanges() - .list(**kwargs) - .execute() - .get('insertedDataPoint', []) + try: + for data_point in ( + service.users() # pylint: disable=no-member + .dataSources() + .dataPointChanges() + .list(**kwargs) + .execute() + .get('insertedDataPoint', []) + ): + data_point['startTime'] = self._get_timestamp(data_point, 'start') + data_point['endTime'] = self._get_timestamp(data_point, 'end') + data_point['modifiedTime'] = self._get_timestamp(data_point, 'modified') + values = [] + + for value in data_point.pop('value', data_point.pop('values', [])): + if isinstance(value, (int, float, str)): + values.append(value) + continue + + if value.get('intVal') is not None: + value = value['intVal'] + elif value.get('fpVal') is not None: + value = value['fpVal'] + elif value.get('stringVal') is not None: + value = value['stringVal'] + elif value.get('mapVal'): + value = { + v['key']: v['value'].get( + 'intVal', + v['value'].get('fpVal', v['value'].get('stringVal')), + ) + for v in value['mapVal'] + } + + values.append(value) + + data_point['values'] = values + yield data_point + except HttpError as e: + err = f'Error while getting data points: {e.status_code}: {e.reason}' + self.logger.warning(err) + raise AssertionError(err) from e + + def _process_data_source(self, data_source: str): + last_timestamp = new_last_timestamp = self._get_last_timestamp(data_source) + + self.logger.debug( + 'Processing new entries from data source %s, last timestamp: %s', + data_source, + datetime.fromtimestamp(last_timestamp).isoformat(), + ) + + new_data_points = 0 + for dp in self._get_data( + user_id=self.user_id, + data_source_id=data_source, + limit=100, ): - data_point['startTime'] = float(data_point.pop('startTimeNanos')) / 1e9 - data_point['endTime'] = float(data_point.pop('endTimeNanos')) / 1e9 - data_point['modifiedTime'] = ( - float(data_point.pop('modifiedTimeMillis')) / 1e6 + dp_time = dp.pop('startTime', 0) + dp.pop('dataStreamId', None) + + if dp_time > last_timestamp: + self._bus.post( + GoogleFitEvent( + user_id=self.user_id, + data_source_id=data_source, + data_type=dp.pop('dataTypeName'), + start_time=dp_time, + end_time=dp.pop('endTime'), + modified_time=dp.pop('modifiedTime'), + values=dp.pop('values'), + **{camel_case_to_snake_case(k): v for k, v in dp.items()}, + ) + ) + + new_data_points += 1 + + new_last_timestamp = max(dp_time, new_last_timestamp) + + last_timestamp = new_last_timestamp + if new_data_points > 0: + self._set_last_timestamp(data_source, last_timestamp) + self.logger.info( + 'Got %d new entries from data source %s, last timestamp: %s', + new_data_points, + data_source, + datetime.fromtimestamp(last_timestamp).isoformat(), ) - values = [] - for value in data_point.pop('value'): - if value.get('intVal') is not None: - value = value['intVal'] - elif value.get('fpVal') is not None: - value = value['fpVal'] - elif value.get('stringVal') is not None: - value = value['stringVal'] - elif value.get('mapVal'): - value = { - v['key']: v['value'].get( - 'intVal', - v['value'].get('fpVal', v['value'].get('stringVal')), - ) - for v in value['mapVal'] - } - - values.append(value) - - data_point['values'] = values - data_points.append(data_point) - - return data_points + def main(self): + while not self.should_stop(): + try: + for data_source in self.data_sources: + self._process_data_source(data_source) + except Exception as e: + self.logger.warning('Exception while processing Fit data') + self.logger.exception(e) + finally: + self.wait_stop(self.poll_interval) # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/google/fit/manifest.yaml b/platypush/plugins/google/fit/manifest.yaml index 9d48ecb05..bc3c4bc8f 100644 --- a/platypush/plugins/google/fit/manifest.yaml +++ b/platypush/plugins/google/fit/manifest.yaml @@ -1,5 +1,6 @@ manifest: - events: {} + events: + - platypush.message.event.google.fit.GoogleFitEvent install: apk: - py3-google-api-python-client