platypush/platypush/plugins/weather/buienradar.py
Fabio Manganiello 49a7ee643e - Added Buienradar integration
- Refactored weather plugin/backend as Darksky plugin/backend
2019-12-31 08:51:19 +01:00

131 lines
5.2 KiB
Python

from typing import Optional, Dict, Any
from platypush.plugins import Plugin, action
from platypush.message.response.weather.buienradar import BuienradarWeatherResponse, BuienradarPrecipitationResponse, \
BuienradarForecastResponse, BuienradarForecast
class WeatherBuienradarPlugin(Plugin):
"""
Plugin for getting weather updates through Buienradar - a Dutch weather app.
Requires:
* **buienradar** (``pip install buienradar``)
"""
def __init__(self, lat: float, long: float, time_frame: int = 120, **kwargs):
"""
:param lat: Default latitude
:param long: Default longitude
:param time_frame: Default number of minutes to look ahead for precipitation forecast
"""
super().__init__(**kwargs)
self.lat = lat
self.long = long
self.time_frame = time_frame
self.latest_bulletin = {}
def get_data(self, lat: Optional[float] = None, long: Optional[float] = None, time_frame: Optional[int] = None) \
-> Dict[str, Any]:
# noinspection PyPackageRequirements
from buienradar.buienradar import get_data, parse_data
# noinspection PyPackageRequirements
from buienradar.constants import SUCCESS, CONTENT, RAINCONTENT, DATA
lat = lat or self.lat
long = long or self.long
time_frame = time_frame or self.time_frame
result = get_data(latitude=lat, longitude=long)
if not result.get(SUCCESS):
raise RuntimeError('Error while retrieving data')
data = result.get(CONTENT)
rain_data = result.get(RAINCONTENT)
result = parse_data(data, rain_data, lat, long, time_frame)
return result.get(DATA, {})
@action
def get_weather(self, lat: Optional[float] = None, long: Optional[float] = None) -> BuienradarWeatherResponse:
"""
Get the current weather conditions.
:param lat: Weather latitude (default: configured latitude)
:param long: Weather longitude (default: configured longitude)
"""
data = self.get_data(lat, long, 60)
return BuienradarWeatherResponse(
barometer_fc=data.get('barometerfcname'),
condition_name=data.get('condition', {}).get('condition'),
condition_name_long=data.get('condition', {}).get('exact'),
condition_image=data.get('condition', {}).get('image'),
feel_temperature=data.get('feeltemperature'),
ground_temperature=data.get('groundtemperature'),
humidity=data.get('humidity'),
irradiance=data.get('irradiance'),
measured=data.get('measured'),
precipitation=data.get('precipitation'),
pressure=data.get('pressure'),
rain_last_24_hours=data.get('rainlast24hour'),
rain_last_hour=data.get('rainlasthour'),
station_name=data.get('stationname'),
temperature=data.get('temperature'),
visibility=data.get('visibility'),
wind_azimuth=data.get('windazimuth'),
wind_direction=data.get('wind_irection'),
wind_force=data.get('windforce'),
wind_gust=data.get('windgust'),
wind_speed=data.get('windspeed')
)
@action
def get_forecast(self, lat: Optional[float] = None, long: Optional[float] = None) -> BuienradarForecastResponse:
"""
Get the weather forecast for the next days.
:param lat: Weather latitude (default: configured latitude)
:param long: Weather longitude (default: configured longitude)
"""
data = self.get_data(lat, long, 60).get('forecast', [])
return BuienradarForecastResponse([
BuienradarForecast(
condition_name=d.get('condition', {}).get('condition'),
condition_name_long=d.get('condition', {}).get('exact'),
condition_image=d.get('condition', {}).get('image'),
date_time=d.get('datetime'),
rain=d.get('rain'),
min_rain=d.get('minrain'),
max_rain=d.get('maxrain'),
rain_chance=d.get('rainchance'),
snow=d.get('snow'),
temperature=d.get('temperature'),
wind_azimuth=d.get('windazimuth'),
wind_direction=d.get('winddirection'),
wind_force=d.get('windforce'),
wind_speed=d.get('windspeed'),
)
for d in data
])
@action
def get_precipitation(self, lat: Optional[float] = None, long: Optional[float] = None,
time_frame: Optional[int] = None) -> BuienradarPrecipitationResponse:
"""
Get the precipitation forecast for the specified time frame.
:param lat: Weather latitude (default: configured latitude)
:param long: Weather longitude (default: configured longitude)
:param time_frame: Time frame for the forecast in minutes (default: configured time_frame)
"""
data = self.get_data(lat, long, time_frame).get('precipitation_forecast', {})
return BuienradarPrecipitationResponse(
average=data.get('average'),
total=data.get('total'),
time_frame=data.get('timeframe'),
)
# vim:sw=4:ts=4:et: