Making the HTTP poll events work

This commit is contained in:
Fabio Manganiello 2018-01-10 03:14:27 +01:00
parent d0759765b2
commit 19cfafb2c7
5 changed files with 89 additions and 34 deletions

View file

@ -1,6 +1,8 @@
import importlib import importlib
import time import time
from threading import Thread
from platypush.bus import Bus from platypush.bus import Bus
from platypush.backend import Backend from platypush.backend import Backend
from platypush.backend.http.request import HttpRequest from platypush.backend.http.request import HttpRequest
@ -17,14 +19,17 @@ class HttpPollBackend(Backend):
method: GET method: GET
type: platypush.backend.http.request.JsonHttpRequest type: platypush.backend.http.request.JsonHttpRequest
args: args:
url: https://hub-api.booking.com/v1/hotels/84326/reservations url: https://host.com/api/v1/endpoint
headers: headers:
X-Booking-Auth-Token: UXsYtIMJKCJB07/P/5Tz1iV8lzVY5kVVF0ZEnQRe+cg0 Token: TOKEN
params: params:
updatedSince: 2018-01-09 updatedSince: 1m
timeout: 5 # Times out after 5 seconds (default)
poll_seconds: 10 # Check for updates on this endpoint every 10 seconds (default: 60) poll_seconds: 60 # Check for updates on this endpoint every 60 seconds (default)
timeout: 5 # Times out after 5 seconds (default) path: ${response['items']} # Path in the JSON to check for new items.
# Python expressions are supported.
# Note that 'response' identifies the JSON root.
# Default value: JSON root.
""" """
def __init__(self, requests, *args, **kwargs): def __init__(self, requests, *args, **kwargs):
@ -34,22 +39,19 @@ class HttpPollBackend(Backend):
""" """
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.requests = [] self.requests = []
self.http_bus = Bus()
for request in requests: for request in requests:
if isinstance(request, dict): if isinstance(request, dict):
type = request['type'] type = request['type']
(module, name) = ('.'.join(type.split('.')[:-1]), type.split('.')[-1]) (module, name) = ('.'.join(type.split('.')[:-1]), type.split('.')[-1])
module = importlib.import_module(module) module = importlib.import_module(module)
request = getattr(module, name)(bus=self.http_bus, **request) request = getattr(module, name)(**request)
elif isinstance(request, HttpRequest): elif not isinstance(request, HttpRequest):
request.bus = self.http_bus
else:
raise RuntimeError('Request should either be a dict or a ' + raise RuntimeError('Request should either be a dict or a ' +
'HttpRequest object, {} found'.format(type(request))) 'HttpRequest object, {} found'.format(type(request)))
request.bus = self.bus
self.requests.append(request) self.requests.append(request)
@ -58,16 +60,14 @@ class HttpPollBackend(Backend):
while not self.should_stop(): while not self.should_stop():
for request in self.requests: for request in self.requests:
if not request.is_alive() and ( if time.time() - request.last_request_timestamp > request.poll_seconds:
request.last_call_timestamp is None or request.execute()
time.time() - request.last_call_timestamp > request.poll_seconds):
response = request.execute() time.sleep(0.1) # Prevent a tight loop
print('**** RESPONSE: {}'.format(response))
time.sleep(0.1)
def send_message(self, msg): def send_message(self, msg):
self.http_bus.post(msg) pass
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,19 +1,19 @@
import copy import copy
import importlib
import json import json
import re import re
import requests import requests
import time import time
from datetime import date
from frozendict import frozendict from frozendict import frozendict
from threading import Thread from threading import Thread
from platypush.message.response import Response from platypush.message.event.http import HttpEvent
class HttpRequest(Thread): class HttpRequest(object):
poll_seconds = 60 poll_seconds = 60
timeout = 5 timeout = 5
bus = None
last_call_timestamp = None
class HttpRequestArguments(object): class HttpRequestArguments(object):
@ -24,12 +24,13 @@ class HttpRequest(Thread):
self.kwargs = kwargs self.kwargs = kwargs
def __init__(self, args, poll_seconds=None, timeout=None, bus=None, **kwargs): def __init__(self, args, bus=None, poll_seconds=None, timeout=None, **kwargs):
super().__init__() super().__init__()
self.poll_seconds = poll_seconds or self.poll_seconds self.poll_seconds = poll_seconds or self.poll_seconds
self.timeout = timeout or self.timeout self.timeout = timeout or self.timeout
self.bus = bus or self.bus self.bus = bus
self.last_request_timestamp = 0
if isinstance(args, self.HttpRequestArguments): if isinstance(args, self.HttpRequestArguments):
self.args = args self.args = args
@ -38,14 +39,37 @@ class HttpRequest(Thread):
else: else:
raise RuntimeError('{} is neither a dictionary nor an HttpRequest') raise RuntimeError('{} is neither a dictionary nor an HttpRequest')
self.request_args = {
'method': self.args.method, 'url': self.args.url, **self.args.kwargs
}
def execute(self): def execute(self):
self.last_call_timestamp = time.time() def _thread_func():
is_first_call = self.last_request_timestamp == 0
self.last_request_timestamp = time.time()
method = getattr(requests, self.args.method.lower()) method = getattr(requests, self.args.method.lower())
response = method(self.args.url, *self.args.args, **self.args.kwargs) response = method(self.args.url, *self.args.args, **self.args.kwargs)
response.raise_for_status() new_items = self.get_new_items(response)
return response
if new_items and not is_first_call and self.bus:
event = HttpEvent(dict(self), new_items)
self.bus.post(event)
response.raise_for_status()
Thread(target=_thread_func).start()
def get_new_items(self, response):
""" Gets new items out of a response """
raise("get_new_items must be implemented in a derived class")
def __iter__(self):
for (key, value) in self.request_args.items():
yield (key, value)
class JsonHttpRequest(HttpRequest): class JsonHttpRequest(HttpRequest):
@ -55,9 +79,8 @@ class JsonHttpRequest(HttpRequest):
self.seen_entries = set() self.seen_entries = set()
def execute(self): def get_new_items(self, response):
is_first_call = self.last_call_timestamp is None response = response.json()
response = super().execute().json()
new_entries = [] new_entries = []
if self.path: if self.path:

View file

@ -48,6 +48,8 @@ class PushbulletBackend(Backend):
return {} return {}
def _should_skip_last_received_msg(self, msg): def _should_skip_last_received_msg(self, msg):
if not isinstance(msg, dict): return True # We received something weird
is_duplicate=False is_duplicate=False
last_msg = self._last_received_msg[msg['type']] last_msg = self._last_received_msg[msg['type']]

View file

@ -1,8 +1,11 @@
import copy
import json import json
import random import random
import re import re
import threading import threading
from datetime import date
from platypush.config import Config from platypush.config import Config
from platypush.message import Message from platypush.message import Message
from platypush.utils import get_event_class_by_type from platypush.utils import get_event_class_by_type
@ -157,6 +160,9 @@ class Event(Message):
the message into a UTF-8 JSON string the message into a UTF-8 JSON string
""" """
args = copy.deepcopy(self.args)
flatten(args)
return json.dumps({ return json.dumps({
'type' : 'event', 'type' : 'event',
'target' : self.target, 'target' : self.target,
@ -164,7 +170,7 @@ class Event(Message):
'id' : self.id if hasattr(self, 'id') else None, 'id' : self.id if hasattr(self, 'id') else None,
'args' : { 'args' : {
'type' : self.type, 'type' : self.type,
**self.args, **args
}, },
}) })
@ -206,5 +212,20 @@ class StopEvent(Event):
return self.args['thread_id'] == threading.get_ident() return self.args['thread_id'] == threading.get_ident()
def flatten(args):
if isinstance(args, dict):
for (key,value) in args.items():
if isinstance(value, date):
args[key] = value.isoformat()
elif isinstance(value, dict) or isinstance(value, list):
flatten(args[key])
elif isinstance(args, list):
for i in range(0,len(args)):
if isinstance(args[i], date):
args[i] = value.isoformat()
elif isinstance(args[i], dict) or isinstance(args[i], list):
flatten(args[i])
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -0,0 +1,9 @@
from platypush.message.event import Event
class HttpEvent(Event):
def __init__(self, request, response, *args, **kwargs):
super().__init__(request=request, response=response, *args, **kwargs)
# vim:sw=4:ts=4:et: