Implemented synchronization with webhook responses.

When a client triggers a `WebhookEvent` by calling a configured webhook
over `/hook/<hook_name>`, the server will now wait for the configured
`@hook` function to complete and it will return the returned response
back to the client.

This makes webhooks much more powerful, as they can be used to proxy
HTTP calls or other services, and in general return something to the
client instead of just executing actions.
This commit is contained in:
Fabio Manganiello 2022-08-30 23:35:19 +02:00
parent e08947a3b7
commit c3fa3315f5
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
3 changed files with 107 additions and 40 deletions

View file

@ -1,9 +1,11 @@
import json import json
from flask import Blueprint, abort, request, Response from flask import Blueprint, abort, request
from flask.wrappers import Response
from platypush.backend.http.app import template_folder from platypush.backend.http.app import template_folder
from platypush.backend.http.app.utils import logger, send_message from platypush.backend.http.app.utils import logger, send_message
from platypush.config import Config
from platypush.message.event.http.hook import WebhookEvent from platypush.message.event.http.hook import WebhookEvent
@ -15,8 +17,10 @@ __routes__ = [
] ]
@hook.route('/hook/<hook_name>', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS']) @hook.route(
def _hook(hook_name): '/hook/<hook_name>', methods=['GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'OPTIONS']
)
def hook_route(hook_name):
"""Endpoint for custom webhooks""" """Endpoint for custom webhooks"""
event_args = { event_args = {
@ -28,20 +32,36 @@ def _hook(hook_name):
} }
if event_args['data']: if event_args['data']:
# noinspection PyBroadException
try: try:
event_args['data'] = json.loads(event_args['data']) event_args['data'] = json.loads(event_args['data'])
except Exception as e: except Exception as e:
logger().warning('Not a valid JSON string: {}: {}'.format(event_args['data'], str(e))) logger().warning(
'Not a valid JSON string: %s: %s', event_args['data'], str(e)
)
event = WebhookEvent(**event_args) event = WebhookEvent(**event_args)
matching_hooks = [
hook
for hook in Config.get_event_hooks().values()
if hook.condition.type == WebhookEvent
and hook.condition.args.get('hook') == hook_name
and request.method.lower()
== hook.condition.args.get('method', request.method).lower()
]
try: try:
send_message(event) send_message(event)
return Response(json.dumps({'status': 'ok', **event_args}), mimetype='application/json')
# If there are matching hooks, wait for their completion before returning
if matching_hooks:
return event.wait_response(timeout=60)
return Response(
json.dumps({'status': 'ok', **event_args}), mimetype='application/json'
)
except Exception as e: except Exception as e:
logger().exception(e) logger().exception(e)
logger().error('Error while dispatching webhook event {}: {}'.format(event, str(e))) logger().error('Error while dispatching webhook event %s: %s', event, str(e))
abort(500, str(e)) abort(500, str(e))

View file

@ -18,7 +18,7 @@ def parse(msg):
"""Builds a dict given another dictionary or """Builds a dict given another dictionary or
a JSON UTF-8 encoded string/bytearray""" a JSON UTF-8 encoded string/bytearray"""
if isinstance(msg, bytes) or isinstance(msg, bytearray): if isinstance(msg, (bytes, bytearray)):
msg = msg.decode('utf-8') msg = msg.decode('utf-8')
if isinstance(msg, str): if isinstance(msg, str):
try: try:
@ -30,7 +30,7 @@ def parse(msg):
return msg return msg
class EventCondition(object): class EventCondition:
"""Event hook condition class""" """Event hook condition class"""
def __init__(self, type=Event.__class__, priority=None, **kwargs): def __init__(self, type=Event.__class__, priority=None, **kwargs):
@ -64,8 +64,7 @@ class EventCondition(object):
rule = parse(rule) rule = parse(rule)
assert isinstance(rule, dict), f'Not a valid rule: {rule}' assert isinstance(rule, dict), f'Not a valid rule: {rule}'
type = get_event_class_by_type( type = get_event_class_by_type(rule.pop('type') if 'type' in rule else 'Event')
rule.pop('type') if 'type' in rule else 'Event')
args = {} args = {}
for (key, value) in rule.items(): for (key, value) in rule.items():
@ -99,7 +98,7 @@ class EventAction(Request):
return super().build(action) return super().build(action)
class EventHook(object): class EventHook:
"""Event hook class. It consists of one conditions and """Event hook class. It consists of one conditions and
one or multiple actions to be executed""" one or multiple actions to be executed"""
@ -163,7 +162,9 @@ class EventHook(object):
if result.is_match: if result.is_match:
logger.info('Running hook {} triggered by an event'.format(self.name)) logger.info('Running hook {} triggered by an event'.format(self.name))
threading.Thread(target=_thread_func, name='Event-' + self.name, args=(result,)).start() threading.Thread(
target=_thread_func, name='Event-' + self.name, args=(result,)
).start()
def hook(event_type=Event, **condition): def hook(event_type=Event, **condition):
@ -172,8 +173,14 @@ def hook(event_type=Event, **condition):
f.condition = EventCondition(type=event_type, **condition) f.condition = EventCondition(type=event_type, **condition)
@wraps(f) @wraps(f)
def wrapped(*args, **kwargs): def wrapped(event, *args, **kwargs):
return exec_wrapper(f, *args, **kwargs) from platypush.message.event.http.hook import WebhookEvent
response = exec_wrapper(f, event, *args, **kwargs)
if isinstance(event, WebhookEvent):
event.send_response(response)
return response
return wrapped return wrapped

View file

@ -1,11 +1,26 @@
import json
import uuid
from platypush.message.event import Event from platypush.message.event import Event
from platypush.utils import get_redis
class WebhookEvent(Event): class WebhookEvent(Event):
""" """
Event triggered when a custom webhook is called. Event triggered when a custom webhook is called.
""" """
def __init__(self, *argv, hook, method, data=None, args=None, headers=None, **kwargs): def __init__(
self,
*argv,
hook,
method,
data=None,
args=None,
headers=None,
response=None,
**kwargs,
):
""" """
:param hook: Name of the invoked web hook, from http://host:port/hook/<hook> :param hook: Name of the invoked web hook, from http://host:port/hook/<hook>
:type hook: str :type hook: str
@ -21,10 +36,35 @@ class WebhookEvent(Event):
:param headers: Request headers :param headers: Request headers
:type args: dict :type args: dict
"""
super().__init__(hook=hook, method=method, data=data, args=args or {}, :param response: Response returned by the hook.
headers=headers or {}, *argv, **kwargs) :type args: dict | list | str
"""
# This queue is used to synchronize with the hook and wait for its completion
kwargs['response_queue'] = kwargs.get(
'response_queue', f'platypush/webhook/{str(uuid.uuid1())}'
)
super().__init__(
*argv,
hook=hook,
method=method,
data=data,
args=args or {},
headers=headers or {},
response=response,
**kwargs,
)
def send_response(self, response):
output = response.output
if isinstance(output, (dict, list)):
output = json.dumps(output)
get_redis().rpush(self.args['response_queue'], output)
def wait_response(self, timeout=None):
return get_redis().blpop(self.args['response_queue'], timeout=timeout)[1]
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et: