From c3b2212972885fb7ffcd9a08b3ac2c162b543db1 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 28 Mar 2019 02:36:16 +0100 Subject: [PATCH] Storing last processed timestamp per fit metric instead of a single timestamp for all the metrics to prevent new data points from being masked from newer ones from other metrics --- platypush/backend/google/fit.py | 39 ++++++++++++++++----------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/platypush/backend/google/fit.py b/platypush/backend/google/fit.py index b591f659b..e8af7bcf9 100644 --- a/platypush/backend/google/fit.py +++ b/platypush/backend/google/fit.py @@ -27,7 +27,7 @@ class GoogleFitBackend(Backend): _default_poll_seconds = 60 _default_user_id = 'me' - _last_timestamp_varname = '_GOOGLE_FIT_LAST_TIMESTAMP' + _last_timestamp_varname = '_GOOGLE_FIT_LAST_TIMESTAMP_' def __init__(self, data_sources, user_id=_default_user_id, poll_seconds=_default_poll_seconds, *args, **kwargs): @@ -51,26 +51,26 @@ class GoogleFitBackend(Backend): self.user_id = user_id self.poll_seconds = poll_seconds - def run(self): super().run() self.logger.info('Started Google Fit backend on data sources {}'.format( self.data_sources)) - last_timestamp = float(get_plugin('variable'). - get(self._last_timestamp_varname).output. - get(self._last_timestamp_varname) or 0) - while not self.should_stop(): - new_last_timestamp = last_timestamp - self.logger.info('Scanning fit data source, last seen timestamp: {}'. - format(str(datetime.datetime.fromtimestamp(last_timestamp)))) - try: for data_source in self.data_sources: - data_source_last_timestamp = 0 + varname = self._last_timestamp_varname + data_source + last_timestamp = float(get_plugin('variable'). + get(varname).output.get(varname) or 0) + + new_last_timestamp = last_timestamp + self.logger.info('Processing new entries from data source {}, last timestamp: {}'. + format(data_source, + str(datetime.datetime.fromtimestamp(last_timestamp)))) + data_points = get_plugin('google.fit').get_data( user_id=self.user_id, data_source_id=data_source).output + new_data_points = 0 for dp in data_points: dp_time = dp.pop('startTime', 0) @@ -85,26 +85,25 @@ class GoogleFitBackend(Backend): end_time=dp.pop('endTime'), modified_time=dp.pop('modifiedTime'), values=dp.pop('values'), - **{ camel_case_to_snake_case(k): v - for k,v in dp.items() } + **{camel_case_to_snake_case(k): v + for k, v in dp.items()} )) + new_data_points += 1 + new_last_timestamp = max(dp_time, new_last_timestamp) - data_source_last_timestamp = max(dp_time, data_source_last_timestamp) - self.logger.info('Got {} entries from data source {}, last timestamp: {}'. - format(len(data_points), data_source, - str(datetime.datetime.fromtimestamp(data_source_last_timestamp)))) last_timestamp = new_last_timestamp + self.logger.info('Got {} new entries from data source {}, last timestamp: {}'. + format(new_data_points, data_source, + str(datetime.datetime.fromtimestamp(last_timestamp)))) - get_plugin('variable').set(**{ - self._last_timestamp_varname: last_timestamp}) + get_plugin('variable').set(**{varname: last_timestamp}) except Exception as e: self.logger.warning('Exception while processing Fit data') self.logger.exception(e) continue - time.sleep(self.poll_seconds)