New interface for implementing backends.

Backends that simply poll for changes and wait some time
between checks can just implement a `loop()` method and,
optionally, `__enter__` and `__exit__` methods, so they
can perform initialization/cleanup logic within a context
manager.
This commit is contained in:
Fabio Manganiello 2020-01-11 18:13:25 +01:00
parent a7ca779870
commit 1e342cc8a5
2 changed files with 48 additions and 24 deletions

View file

@ -5,8 +5,10 @@
import logging import logging
import threading import threading
import time
from threading import Thread from threading import Thread
from typing import Optional
from platypush.bus import Bus from platypush.bus import Bus
from platypush.config import Config from platypush.config import Config
@ -35,13 +37,15 @@ class Backend(Thread, EventGenerator):
_default_response_timeout = 5 _default_response_timeout = 5
def __init__(self, bus=None, **kwargs): # Loop function, can be implemented by derived classes
loop = None
def __init__(self, bus: Optional[Bus] = None, poll_seconds: Optional[float] = None, **kwargs):
""" """
:param bus: Reference to the bus object to be used in the backend :param bus: Reference to the bus object to be used in the backend
:type bus: platypush.bus.Bus :param poll_seconds: If the backend implements a ``loop`` method, this parameter expresses how often the
loop should run in seconds.
:param kwargs: Key-value configuration for the backend :param kwargs: Key-value configuration for the backend
:type kwargs: dict
""" """
self._thread_name = self.__class__.__name__ self._thread_name = self.__class__.__name__
@ -51,6 +55,7 @@ class Backend(Thread, EventGenerator):
# If no bus is specified, create an internal queue where # If no bus is specified, create an internal queue where
# the received messages will be pushed # the received messages will be pushed
self.bus = bus or Bus() self.bus = bus or Bus()
self.poll_seconds = float(poll_seconds) if poll_seconds else None
self.device_id = Config.get('device_id') self.device_id = Config.get('device_id')
self.thread_id = None self.thread_id = None
self._should_stop = False self._should_stop = False
@ -218,6 +223,28 @@ class Backend(Thread, EventGenerator):
""" Starts the backend thread. To be implemented in the derived classes """ """ Starts the backend thread. To be implemented in the derived classes """
self.thread_id = threading.get_ident() self.thread_id = threading.get_ident()
set_thread_name(self._thread_name) set_thread_name(self._thread_name)
if not callable(self.loop):
return
with self:
while not self.should_stop():
try:
self.loop()
except Exception as e:
self.logger.error(str(e))
self.logger.exception(e)
finally:
if self.poll_seconds:
time.sleep(self.poll_seconds)
def __enter__(self):
""" Invoked when the backend is initialized, if the main logic is within a ``loop()`` function """
self.logger.info('Initialized backend {}'.format(self.__class__.__name__))
def __exit__(self, exc_type, exc_val, exc_tb):
""" Invoked when the backend is terminated, if the main logic is within a ``loop()`` function """
self.on_stop()
self.logger.info('Terminated backend {}'.format(self.__class__.__name__))
def on_stop(self): def on_stop(self):
""" Callback invoked when the process stops """ """ Callback invoked when the process stops """

View file

@ -1,4 +1,4 @@
import time from typing import Optional
from platypush.backend import Backend from platypush.backend import Backend
from platypush.context import get_plugin from platypush.context import get_plugin
@ -22,34 +22,31 @@ class FoursquareBackend(Backend):
_last_created_at_varname = '_foursquare_checkin_last_created_at' _last_created_at_varname = '_foursquare_checkin_last_created_at'
def __init__(self, poll_seconds: float = 60.0, *args, **kwargs): def __init__(self, poll_seconds: Optional[float] = 60.0, *args, **kwargs):
""" """
:param poll_seconds: How often the backend should check for new check-ins (default: one minute). :param poll_seconds: How often the backend should check for new check-ins (default: one minute).
""" """
super().__init__(*args, **kwargs) super().__init__(*args, poll_seconds=poll_seconds, **kwargs)
self.poll_seconds = poll_seconds
self._last_created_at = None self._last_created_at = None
def run(self): def __enter__(self):
super().run()
self._last_created_at = int(get_plugin('variable').get(self._last_created_at_varname). self._last_created_at = int(get_plugin('variable').get(self._last_created_at_varname).
output.get(self._last_created_at_varname) or 0) output.get(self._last_created_at_varname) or 0)
self.logger.info('Started Foursquare backend') self.logger.info('Started Foursquare backend')
while not self.should_stop(): def loop(self):
try: checkins = get_plugin('foursquare').get_checkins().output
checkins = get_plugin('foursquare').get_checkins().output if not checkins:
if checkins: return
last_checkin = checkins[0]
if not self._last_created_at or last_checkin.get('createdAt', 0) > self._last_created_at: last_checkin = checkins[0]
self.bus.post(FoursquareCheckinEvent(checkin=last_checkin)) last_checkin_created_at = last_checkin.get('createdAt', 0)
self._last_created_at = last_checkin.get('createdAt', 0) if self._last_created_at and last_checkin_created_at <= self._last_created_at:
get_plugin('variable').set(**{self._last_created_at_varname: self._last_created_at}) return
except Exception as e:
self.logger.error('Error while retrieving the list of checkins: {}'.format(str(e))) self.bus.post(FoursquareCheckinEvent(checkin=last_checkin))
self.logger.exception(e) self._last_created_at = last_checkin_created_at
finally: get_plugin('variable').set(**{self._last_created_at_varname: self._last_created_at})
time.sleep(self.poll_seconds)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et: