Storing last processed timestamp per fit metric instead of a single timestamp for all the metrics to prevent new data points from being masked from newer ones from other metrics

This commit is contained in:
Fabio Manganiello 2019-03-28 02:36:16 +01:00
parent 68f985d2c6
commit c3b2212972

View file

@ -27,7 +27,7 @@ class GoogleFitBackend(Backend):
_default_poll_seconds = 60 _default_poll_seconds = 60
_default_user_id = 'me' _default_user_id = 'me'
_last_timestamp_varname = '_GOOGLE_FIT_LAST_TIMESTAMP' _last_timestamp_varname = '_GOOGLE_FIT_LAST_TIMESTAMP_'
def __init__(self, data_sources, user_id=_default_user_id, def __init__(self, data_sources, user_id=_default_user_id,
poll_seconds=_default_poll_seconds, *args, **kwargs): poll_seconds=_default_poll_seconds, *args, **kwargs):
@ -51,26 +51,26 @@ class GoogleFitBackend(Backend):
self.user_id = user_id self.user_id = user_id
self.poll_seconds = poll_seconds self.poll_seconds = poll_seconds
def run(self): def run(self):
super().run() super().run()
self.logger.info('Started Google Fit backend on data sources {}'.format( self.logger.info('Started Google Fit backend on data sources {}'.format(
self.data_sources)) self.data_sources))
last_timestamp = float(get_plugin('variable').
get(self._last_timestamp_varname).output.
get(self._last_timestamp_varname) or 0)
while not self.should_stop(): while not self.should_stop():
new_last_timestamp = last_timestamp
self.logger.info('Scanning fit data source, last seen timestamp: {}'.
format(str(datetime.datetime.fromtimestamp(last_timestamp))))
try: try:
for data_source in self.data_sources: for data_source in self.data_sources:
data_source_last_timestamp = 0 varname = self._last_timestamp_varname + data_source
last_timestamp = float(get_plugin('variable').
get(varname).output.get(varname) or 0)
new_last_timestamp = last_timestamp
self.logger.info('Processing new entries from data source {}, last timestamp: {}'.
format(data_source,
str(datetime.datetime.fromtimestamp(last_timestamp))))
data_points = get_plugin('google.fit').get_data( data_points = get_plugin('google.fit').get_data(
user_id=self.user_id, data_source_id=data_source).output user_id=self.user_id, data_source_id=data_source).output
new_data_points = 0
for dp in data_points: for dp in data_points:
dp_time = dp.pop('startTime', 0) dp_time = dp.pop('startTime', 0)
@ -89,22 +89,21 @@ class GoogleFitBackend(Backend):
for k, v in dp.items()} for k, v in dp.items()}
)) ))
new_data_points += 1
new_last_timestamp = max(dp_time, new_last_timestamp) new_last_timestamp = max(dp_time, new_last_timestamp)
data_source_last_timestamp = max(dp_time, data_source_last_timestamp)
self.logger.info('Got {} entries from data source {}, last timestamp: {}'.
format(len(data_points), data_source,
str(datetime.datetime.fromtimestamp(data_source_last_timestamp))))
last_timestamp = new_last_timestamp last_timestamp = new_last_timestamp
self.logger.info('Got {} new entries from data source {}, last timestamp: {}'.
format(new_data_points, data_source,
str(datetime.datetime.fromtimestamp(last_timestamp))))
get_plugin('variable').set(**{ get_plugin('variable').set(**{varname: last_timestamp})
self._last_timestamp_varname: last_timestamp})
except Exception as e: except Exception as e:
self.logger.warning('Exception while processing Fit data') self.logger.warning('Exception while processing Fit data')
self.logger.exception(e) self.logger.exception(e)
continue continue
time.sleep(self.poll_seconds) time.sleep(self.poll_seconds)