From 3bb4c4434c0e17b3abac303e3c75f72ad81bd821 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Thu, 30 Apr 2020 00:44:34 +0200 Subject: [PATCH] Added Covid-19 country stats plugin and backend/monitor --- platypush/backend/covid19.py | 114 +++++++++++++++++++++++++++++ platypush/message/event/covid19.py | 24 ++++++ platypush/plugins/covid19.py | 64 ++++++++++++++++ 3 files changed, 202 insertions(+) create mode 100644 platypush/backend/covid19.py create mode 100644 platypush/message/event/covid19.py create mode 100644 platypush/plugins/covid19.py diff --git a/platypush/backend/covid19.py b/platypush/backend/covid19.py new file mode 100644 index 00000000..8bf3a88e --- /dev/null +++ b/platypush/backend/covid19.py @@ -0,0 +1,114 @@ +import datetime +import os +from typing import Optional, Union, List, Dict, Any + +from sqlalchemy import create_engine, Column, Integer, String, DateTime +from sqlalchemy.orm import sessionmaker, scoped_session +from sqlalchemy.ext.declarative import declarative_base + +from platypush.backend import Backend +from platypush.config import Config +from platypush.context import get_plugin +from platypush.message.event.covid19 import Covid19UpdateEvent +from platypush.plugins.covid19 import Covid19Plugin + +Base = declarative_base() +Session = scoped_session(sessionmaker()) + + +class Covid19Update(Base): + """ Models the Covid19Data table """ + + __tablename__ = 'covid19data' + __table_args__ = ({'sqlite_autoincrement': True}) + + country = Column(String, primary_key=True) + confirmed = Column(Integer, nullable=False, default=0) + deaths = Column(Integer, nullable=False, default=0) + recovered = Column(Integer, nullable=False, default=0) + last_updated_at = Column(DateTime, nullable=False, default=datetime.datetime.utcnow) + + +class Covid19Backend(Backend): + """ + This backend polls new data about the Covid-19 pandemic diffusion and triggers events when new data is available. + + Triggers: + + - :class:`platypush.message.event.covid19.Covid19UpdateEvent` when new data is available. + + """ + + # noinspection PyProtectedMember + def __init__(self, country: Optional[Union[str, List[str]]], poll_seconds: Optional[float] = 3600.0, **kwargs): + """ + :param country: Default country (or list of countries) to retrieve the stats for. It can either be the full + country name or the country code. Special values: + + - ``world``: Get worldwide stats. + - ``all``: Get all the available stats. + + Default: either the default configured on the :class:`platypush.plugins.covid19.Covid19Plugin` plugin or + ``world``. + + :param poll_seconds: How often the backend should check for new check-ins (default: one hour). + """ + super().__init__(poll_seconds=poll_seconds, **kwargs) + self._plugin: Covid19Plugin = get_plugin('covid19') + self.country: List[str] = self._plugin._get_countries(country) + self.workdir = os.path.join(os.path.expanduser(Config.get('workdir')), 'covid19') + self.dbfile = os.path.join(self.workdir, 'data.db') + os.makedirs(self.workdir, exist_ok=True) + + def __enter__(self): + self.logger.info('Started Covid19 backend') + + def __exit__(self, exc_type, exc_val, exc_tb): + self.logger.info('Stopped Covid19 backend') + + def _process_update(self, summary: Dict[str, Any], session: Session): + update_time = datetime.datetime.fromisoformat(summary['Date'].replace('Z', '+00:00')) + + self.bus.post(Covid19UpdateEvent( + country=summary['CountryCode'], + confirmed=summary['TotalConfirmed'], + deaths=summary['TotalDeaths'], + recovered=summary['TotalRecovered'], + update_time=update_time, + )) + + session.add(Covid19Update(country=summary['CountryCode'], + confirmed=summary['TotalConfirmed'], + deaths=summary['TotalDeaths'], + recovered=summary['TotalRecovered'], + last_updated_at=update_time)) + + def loop(self): + # noinspection PyUnresolvedReferences + summaries = self._plugin.summary(self.country).output + if not summaries: + return + + engine = create_engine('sqlite:///{}'.format(self.dbfile), connect_args={'check_same_thread': False}) + Base.metadata.create_all(engine) + Session.configure(bind=engine) + session = Session() + + last_records = { + record.country: record + for record in session.query(Covid19Update).filter(Covid19Update.country.in_(self.country)).all() + } + + for summary in summaries: + country = summary['CountryCode'] + last_record = last_records.get(country) + if not last_record or \ + summary['TotalConfirmed'] != last_record.confirmed or \ + summary['TotalDeaths'] != last_record.deaths or \ + summary['TotalRecovered'] != last_record.recovered: + self._process_update(summary=summary, session=session) + + session.commit() + + +# vim:sw=4:ts=4:et: diff --git a/platypush/message/event/covid19.py b/platypush/message/event/covid19.py new file mode 100644 index 00000000..9e21ac9b --- /dev/null +++ b/platypush/message/event/covid19.py @@ -0,0 +1,24 @@ +from datetime import datetime +from typing import Optional + +from platypush.message.event import Event + + +class Covid19UpdateEvent(Event): + def __init__(self, + confirmed: int, + deaths: int, + recovered: int, + country: Optional[str] = None, + update_time: Optional[datetime] = None, + *args, **kwargs): + super().__init__(*args, + confirmed=confirmed, + deaths=deaths, + recovered=recovered, + country=country, + update_time=update_time, + **kwargs) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/covid19.py b/platypush/plugins/covid19.py new file mode 100644 index 00000000..c02a6b72 --- /dev/null +++ b/platypush/plugins/covid19.py @@ -0,0 +1,64 @@ +from typing import Optional, Union, List, Dict, Any + +import requests + +from platypush.plugins import Plugin, action + + +class Covid19Plugin(Plugin): + """ + Monitor the diffusion data of the COVID-19 pandemic by using the public API at https://api.covid19api.com. + """ + + base_url = 'https://api.covid19api.com' + + def __init__(self, country: Union[str, List[str]] = 'world', **kwargs): + """ + :param country: Default country (or list of countries) to retrieve the stats for. It can either be the full + country name or the country code. Special values: + + - ``world``: Get worldwide stats (default). + - ``all``: Get all the available stats. + """ + super().__init__(**kwargs) + self.country = country + self.country = self._get_countries(country) + + def _get_countries(self, country: Optional[Union[str, List[str]]] = None) -> List[str]: + country = country or self.country + if isinstance(country, str): + country = country.split(',') + return [c.upper().strip() for c in country] + + @action + def summary(self, country: Optional[Union[str, List[str]]] = None) -> List[Dict[str, Any]]: + """ + Get the summary data for the world or a country. + + :param country: Default country override. + """ + countries = self._get_countries(country) + response = requests.get('{}/summary'.format(self.base_url)).json() + if countries[0] == 'all': + return response.get('Countries', []) + if countries[0] == 'world': + return response.get('Global', {}) + + return [ + c for c in response.get('Countries', []) + if c.get('CountryCode').upper() in countries + or c.get('Country').upper() in countries + ] + + @action + def data(self, country: Optional[Union[str, List[str]]] = None) -> List[Dict[str, Any]]: + """ + Get all the data for a country. + + :param country: Default country override. + """ + country = self._get_countries(country)[0] + return requests.get('{}/total/dayone/country/{}'.format(self.base_url, country)).json() + + +# vim:sw=4:ts=4:et: