Making JSON HTTP poll requests work

This commit is contained in:
Fabio Manganiello 2018-01-09 18:44:45 +01:00
parent c1ab3f5a14
commit 2ea06f7708
6 changed files with 139 additions and 23 deletions

View File

@ -1,11 +1,9 @@
import json
import requests
import importlib
import time
from threading import Thread
from platypush.message.response import Response
from .. import Backend
from platypush.bus import Bus
from platypush.backend import Backend
from platypush.backend.http.request import HttpRequest
class HttpPollBackend(Backend):
@ -14,36 +12,62 @@ class HttpPollBackend(Backend):
the bus whenever something new happened. Example configuration:
backend.http.poll:
services:
requests:
-
type: platypush.backend.http.service.ota.booking.GetReservations
method: GET
type: platypush.backend.http.request.JsonHttpRequest
args:
token: YOUR_TOKEN
poll_seconds: 10 # Check for updates on this endpoint every 10 seconds
limit: 5 # Return the first 5 (new) results (default: all)
url: https://hub-api.booking.com/v1/hotels/84326/reservations
headers:
X-Booking-Auth-Token: UXsYtIMJKCJB07/P/5Tz1iV8lzVY5kVVF0ZEnQRe+cg0
params:
updatedSince: 2018-01-09
poll_seconds: 10 # Check for updates on this endpoint every 10 seconds (default: 60)
timeout: 5 # Times out after 5 seconds (default)
"""
def __init__(self, services, *args, **kwargs):
def __init__(self, requests, *args, **kwargs):
"""
Params:
services -- List/iterable of HttpService objects
requests -- List/iterable of HttpRequest objects
"""
super().__init__(*args, **kwargs)
self.services = services
self.requests = []
self.http_bus = Bus()
for request in requests:
if isinstance(request, dict):
type = request['type']
(module, name) = ('.'.join(type.split('.')[:-1]), type.split('.')[-1])
module = importlib.import_module(module)
request = getattr(module, name)(bus=self.http_bus, **request)
elif isinstance(request, HttpRequest):
request.bus = self.http_bus
else:
raise RuntimeError('Request should either be a dict or a ' +
'HttpRequest object, {} found'.format(type(request)))
self.requests.append(request)
def run(self):
super().run()
while not self.should_stop():
for request in self.requests:
if not request.is_alive() and (
request.last_call_timestamp is None or
time.time() - request.last_call_timestamp > request.poll_seconds):
response = request.execute()
print('**** RESPONSE: {}'.format(response))
time.sleep(0.1)
def send_message(self, msg):
pass
def on_stop(self):
pass
def stop(self):
super().stop()
self.http_bus.post(msg)
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,88 @@
import copy
import json
import re
import requests
import time
from frozendict import frozendict
from threading import Thread
from platypush.message.response import Response
class HttpRequest(Thread):
poll_seconds = 60
timeout = 5
bus = None
last_call_timestamp = None
class HttpRequestArguments(object):
def __init__(self, url, method='get', *args, **kwargs):
self.method = method.lower()
self.url = url
self.args = args
self.kwargs = kwargs
def __init__(self, args, poll_seconds=None, timeout=None, bus=None, **kwargs):
super().__init__()
self.poll_seconds = poll_seconds or self.poll_seconds
self.timeout = timeout or self.timeout
self.bus = bus or self.bus
if isinstance(args, self.HttpRequestArguments):
self.args = args
elif isinstance(args, dict):
self.args = self.HttpRequestArguments(**args)
else:
raise RuntimeError('{} is neither a dictionary nor an HttpRequest')
def execute(self):
self.last_call_timestamp = time.time()
method = getattr(requests, self.args.method.lower())
response = method(self.args.url, *self.args.args, **self.args.kwargs)
response.raise_for_status()
return response
class JsonHttpRequest(HttpRequest):
def __init__(self, path=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.path = path
self.seen_entries = set()
def execute(self):
is_first_call = self.last_call_timestamp is None
response = super().execute().json()
new_entries = []
if self.path:
m = re.match('\$\{\s*(.*)\s*\}', self.path)
response = eval(m.group(1))
for entry in response:
flattened_entry = deep_freeze(entry)
if flattened_entry not in self.seen_entries:
new_entries.append(entry)
self.seen_entries.add(flattened_entry)
return new_entries
def deep_freeze(x):
if isinstance(x, str) or not hasattr(x, "__len__") :
return x
if hasattr(x, "keys") and hasattr(x, "values") :
return frozendict({deep_freeze(k) : deep_freeze(v) for k,v in x.items()})
if hasattr(x, "__getitem__") :
return tuple(map(deep_freeze, x))
return frozenset(map(deep_freeze,x))
# vim:sw=4:ts=4:et:

View File

@ -11,6 +11,9 @@ websocket-client
# HTTP backend support
flask
# HTTP poll backend support
frozendict
# Database plugin support
sqlalchemy

View File

@ -64,6 +64,7 @@ setup(
'Support for Apache Kafka backend': ['kafka-python'],
'Support for Pushbullet backend': ['requests', 'websocket-client'],
'Support for HTTP backend': ['flask'],
'Support for HTTP poll backend': ['frozendict'],
'Support for database plugin': ['sqlalchemy'],
'Support for Philips Hue plugin': ['phue'],
'Support for MPD/Mopidy music server plugin': ['python-mpd2'],
@ -71,7 +72,7 @@ setup(
'Support for text2speech plugin': ['mplayer'],
'Support for OMXPlayer plugin': ['omxplayer'],
'Support for YouTube in the OMXPlayer plugin': ['youtube-dl'],
'Support for Google Assistant': ['google-assistant-sdk[samples]'],
'Support for Google Assistant': ['google-assistant-library'],
# 'Support for Flic buttons': ['git+ssh://git@github.com/50ButtonsEach/fliclib-linux-hci']
},
)