From 19cfafb2c7958bbbf01dc051b88ebb662a2180a9 Mon Sep 17 00:00:00 2001 From: Fabio Manganiello Date: Wed, 10 Jan 2018 03:14:27 +0100 Subject: [PATCH] Making the HTTP poll events work --- platypush/backend/http/poll/__init__.py | 38 ++++++++-------- platypush/backend/http/request/__init__.py | 51 ++++++++++++++++------ platypush/backend/pushbullet/__init__.py | 2 + platypush/message/event/__init__.py | 23 +++++++++- platypush/message/event/http/__init__.py | 9 ++++ 5 files changed, 89 insertions(+), 34 deletions(-) create mode 100644 platypush/message/event/http/__init__.py diff --git a/platypush/backend/http/poll/__init__.py b/platypush/backend/http/poll/__init__.py index 2d6ab34740..1b8acc7965 100644 --- a/platypush/backend/http/poll/__init__.py +++ b/platypush/backend/http/poll/__init__.py @@ -1,6 +1,8 @@ import importlib import time +from threading import Thread + from platypush.bus import Bus from platypush.backend import Backend from platypush.backend.http.request import HttpRequest @@ -17,14 +19,17 @@ class HttpPollBackend(Backend): method: GET type: platypush.backend.http.request.JsonHttpRequest args: - url: https://hub-api.booking.com/v1/hotels/84326/reservations + url: https://host.com/api/v1/endpoint headers: - X-Booking-Auth-Token: UXsYtIMJKCJB07/P/5Tz1iV8lzVY5kVVF0ZEnQRe+cg0 + Token: TOKEN params: - updatedSince: 2018-01-09 - - poll_seconds: 10 # Check for updates on this endpoint every 10 seconds (default: 60) - timeout: 5 # Times out after 5 seconds (default) + updatedSince: 1m + timeout: 5 # Times out after 5 seconds (default) + poll_seconds: 60 # Check for updates on this endpoint every 60 seconds (default) + path: ${response['items']} # Path in the JSON to check for new items. + # Python expressions are supported. + # Note that 'response' identifies the JSON root. + # Default value: JSON root. """ def __init__(self, requests, *args, **kwargs): @@ -34,22 +39,19 @@ class HttpPollBackend(Backend): """ super().__init__(*args, **kwargs) - self.requests = [] - self.http_bus = Bus() for request in requests: if isinstance(request, dict): type = request['type'] (module, name) = ('.'.join(type.split('.')[:-1]), type.split('.')[-1]) module = importlib.import_module(module) - request = getattr(module, name)(bus=self.http_bus, **request) - elif isinstance(request, HttpRequest): - request.bus = self.http_bus - else: + request = getattr(module, name)(**request) + elif not isinstance(request, HttpRequest): raise RuntimeError('Request should either be a dict or a ' + 'HttpRequest object, {} found'.format(type(request))) + request.bus = self.bus self.requests.append(request) @@ -58,16 +60,14 @@ class HttpPollBackend(Backend): while not self.should_stop(): for request in self.requests: - if not request.is_alive() and ( - request.last_call_timestamp is None or - time.time() - request.last_call_timestamp > request.poll_seconds): - response = request.execute() - print('**** RESPONSE: {}'.format(response)) + if time.time() - request.last_request_timestamp > request.poll_seconds: + request.execute() + + time.sleep(0.1) # Prevent a tight loop - time.sleep(0.1) def send_message(self, msg): - self.http_bus.post(msg) + pass # vim:sw=4:ts=4:et: diff --git a/platypush/backend/http/request/__init__.py b/platypush/backend/http/request/__init__.py index 751461b230..01e67a54e4 100644 --- a/platypush/backend/http/request/__init__.py +++ b/platypush/backend/http/request/__init__.py @@ -1,19 +1,19 @@ import copy +import importlib import json import re import requests import time +from datetime import date from frozendict import frozendict from threading import Thread -from platypush.message.response import Response +from platypush.message.event.http import HttpEvent -class HttpRequest(Thread): +class HttpRequest(object): poll_seconds = 60 timeout = 5 - bus = None - last_call_timestamp = None class HttpRequestArguments(object): @@ -24,12 +24,13 @@ class HttpRequest(Thread): self.kwargs = kwargs - def __init__(self, args, poll_seconds=None, timeout=None, bus=None, **kwargs): + def __init__(self, args, bus=None, poll_seconds=None, timeout=None, **kwargs): super().__init__() self.poll_seconds = poll_seconds or self.poll_seconds self.timeout = timeout or self.timeout - self.bus = bus or self.bus + self.bus = bus + self.last_request_timestamp = 0 if isinstance(args, self.HttpRequestArguments): self.args = args @@ -38,14 +39,37 @@ class HttpRequest(Thread): else: raise RuntimeError('{} is neither a dictionary nor an HttpRequest') + self.request_args = { + 'method': self.args.method, 'url': self.args.url, **self.args.kwargs + } + def execute(self): - self.last_call_timestamp = time.time() + def _thread_func(): + is_first_call = self.last_request_timestamp == 0 + self.last_request_timestamp = time.time() - method = getattr(requests, self.args.method.lower()) - response = method(self.args.url, *self.args.args, **self.args.kwargs) - response.raise_for_status() - return response + method = getattr(requests, self.args.method.lower()) + response = method(self.args.url, *self.args.args, **self.args.kwargs) + new_items = self.get_new_items(response) + + if new_items and not is_first_call and self.bus: + event = HttpEvent(dict(self), new_items) + self.bus.post(event) + + response.raise_for_status() + + Thread(target=_thread_func).start() + + + def get_new_items(self, response): + """ Gets new items out of a response """ + raise("get_new_items must be implemented in a derived class") + + + def __iter__(self): + for (key, value) in self.request_args.items(): + yield (key, value) class JsonHttpRequest(HttpRequest): @@ -55,9 +79,8 @@ class JsonHttpRequest(HttpRequest): self.seen_entries = set() - def execute(self): - is_first_call = self.last_call_timestamp is None - response = super().execute().json() + def get_new_items(self, response): + response = response.json() new_entries = [] if self.path: diff --git a/platypush/backend/pushbullet/__init__.py b/platypush/backend/pushbullet/__init__.py index a9d9661c97..61637bb0bc 100644 --- a/platypush/backend/pushbullet/__init__.py +++ b/platypush/backend/pushbullet/__init__.py @@ -48,6 +48,8 @@ class PushbulletBackend(Backend): return {} def _should_skip_last_received_msg(self, msg): + if not isinstance(msg, dict): return True # We received something weird + is_duplicate=False last_msg = self._last_received_msg[msg['type']] diff --git a/platypush/message/event/__init__.py b/platypush/message/event/__init__.py index d7a1f039cb..1e23c47656 100644 --- a/platypush/message/event/__init__.py +++ b/platypush/message/event/__init__.py @@ -1,8 +1,11 @@ +import copy import json import random import re import threading +from datetime import date + from platypush.config import Config from platypush.message import Message from platypush.utils import get_event_class_by_type @@ -157,6 +160,9 @@ class Event(Message): the message into a UTF-8 JSON string """ + args = copy.deepcopy(self.args) + flatten(args) + return json.dumps({ 'type' : 'event', 'target' : self.target, @@ -164,7 +170,7 @@ class Event(Message): 'id' : self.id if hasattr(self, 'id') else None, 'args' : { 'type' : self.type, - **self.args, + **args }, }) @@ -206,5 +212,20 @@ class StopEvent(Event): return self.args['thread_id'] == threading.get_ident() +def flatten(args): + if isinstance(args, dict): + for (key,value) in args.items(): + if isinstance(value, date): + args[key] = value.isoformat() + elif isinstance(value, dict) or isinstance(value, list): + flatten(args[key]) + elif isinstance(args, list): + for i in range(0,len(args)): + if isinstance(args[i], date): + args[i] = value.isoformat() + elif isinstance(args[i], dict) or isinstance(args[i], list): + flatten(args[i]) + + # vim:sw=4:ts=4:et: diff --git a/platypush/message/event/http/__init__.py b/platypush/message/event/http/__init__.py new file mode 100644 index 0000000000..7dc83208ec --- /dev/null +++ b/platypush/message/event/http/__init__.py @@ -0,0 +1,9 @@ +from platypush.message.event import Event + +class HttpEvent(Event): + def __init__(self, request, response, *args, **kwargs): + super().__init__(request=request, response=response, *args, **kwargs) + + +# vim:sw=4:ts=4:et: +