platypush/platypush/backend/http/__init__.py

265 lines
9 KiB
Python
Raw Normal View History

2018-01-29 13:47:21 +01:00
import asyncio
import inspect
2018-01-04 02:45:23 +01:00
import logging
import json
2018-01-29 13:47:21 +01:00
import os
2018-05-05 04:37:22 +02:00
import re
2018-01-04 17:20:35 +01:00
import time
2018-01-04 02:45:23 +01:00
2018-01-29 13:47:21 +01:00
from threading import Thread
2018-01-04 02:45:23 +01:00
from multiprocessing import Process
from flask import Flask, abort, jsonify, request as http_request, render_template, send_from_directory
from redis import Redis
2018-01-04 02:45:23 +01:00
2018-01-04 17:20:35 +01:00
from platypush.config import Config
2018-01-04 02:45:23 +01:00
from platypush.message import Message
2018-01-29 13:47:21 +01:00
from platypush.message.event import Event
from platypush.message.event.web.widget import WidgetUpdateEvent
2018-01-04 02:45:23 +01:00
from platypush.message.request import Request
from .. import Backend
class HttpBackend(Backend):
""" Example interaction with the HTTP backend to make requests:
$ curl -XPOST -H 'Content-Type: application/json' -H "X-Token: your_token" \
-d '{"type":"request","target":"nodename","action":"tts.say","args": {"phrase":"This is a test"}}' \
http://localhost:8008/execute """
2018-01-04 02:45:23 +01:00
2018-02-09 20:54:02 +01:00
websocket_ping_tries = 3
websocket_ping_timeout = 60.0
hidden_plugins = {
'assistant.google'
}
2018-02-09 20:54:02 +01:00
2018-01-29 16:34:00 +01:00
def __init__(self, port=8008, websocket_port=8009, disable_websocket=False,
redis_queue='platypush_flask_mq', token=None, dashboard={}, **kwargs):
2018-01-04 02:45:23 +01:00
super().__init__(**kwargs)
2018-01-04 02:45:23 +01:00
self.port = port
2018-01-29 13:47:21 +01:00
self.websocket_port = websocket_port
self.redis_queue = redis_queue
2018-01-04 02:45:23 +01:00
self.token = token
self.dashboard = dashboard
2018-01-04 17:20:35 +01:00
self.server_proc = None
2018-01-29 16:34:00 +01:00
self.disable_websocket = disable_websocket
2018-01-29 13:47:21 +01:00
self.websocket_thread = None
self.active_websockets = set()
self.redis = Redis()
2018-01-04 02:45:23 +01:00
def send_message(self, msg):
2018-01-04 17:20:35 +01:00
logging.warning('Use cURL or any HTTP client to query the HTTP backend')
2018-01-04 02:45:23 +01:00
2018-01-04 17:20:35 +01:00
def stop(self):
logging.info('Received STOP event on HttpBackend')
2018-01-29 13:47:21 +01:00
2018-01-04 17:20:35 +01:00
if self.server_proc:
self.server_proc.terminate()
self.server_proc.join()
2018-01-04 02:45:23 +01:00
2018-01-29 13:47:21 +01:00
def notify_web_clients(self, event):
2018-01-29 16:34:00 +01:00
import websockets
2018-01-29 13:47:21 +01:00
async def send_event(websocket):
await websocket.send(str(event))
loop = asyncio.new_event_loop()
for websocket in self.active_websockets:
try:
loop.run_until_complete(send_event(websocket))
except websockets.exceptions.ConnectionClosed:
logging.info('Client connection lost')
def redis_poll(self):
while not self.should_stop():
msg = self.redis.blpop(self.redis_queue)
msg = Message.build(json.loads(msg[1].decode('utf-8')))
self.bus.post(msg)
2018-01-04 02:45:23 +01:00
2018-01-29 13:47:21 +01:00
def webserver(self):
basedir = os.path.dirname(inspect.getfile(self.__class__))
template_dir = os.path.join(basedir, 'templates')
static_dir = os.path.join(basedir, 'static')
2018-01-29 13:47:21 +01:00
app = Flask(__name__, template_folder=template_dir)
Thread(target=self.redis_poll).start()
2018-01-04 02:45:23 +01:00
@app.route('/execute', methods=['POST'])
2018-01-29 13:47:21 +01:00
def execute():
args = json.loads(http_request.data.decode('utf-8'))
token = http_request.headers['X-Token'] if 'X-Token' in http_request.headers else None
if token != self.token: abort(401)
2018-01-04 02:45:23 +01:00
msg = Message.build(args)
2018-01-04 17:20:35 +01:00
logging.info('Received message on the HTTP backend: {}'.format(msg))
2018-01-04 02:45:23 +01:00
2018-01-04 17:20:35 +01:00
if isinstance(msg, Request):
response = msg.execute(async=False)
logging.info('Processing response on the HTTP backend: {}'.format(msg))
return str(response)
2018-01-29 13:47:21 +01:00
elif isinstance(msg, Event):
self.redis.rpush(self.redis_queue, msg)
2018-01-04 02:45:23 +01:00
2018-01-04 17:20:35 +01:00
return jsonify({ 'status': 'ok' })
2018-01-29 13:47:21 +01:00
@app.route('/')
def index():
configured_plugins = Config.get_plugins()
enabled_plugins = {}
hidden_plugins = {}
2018-01-29 13:47:21 +01:00
for plugin, conf in configured_plugins.items():
template_file = os.path.join('plugins', plugin + '.html')
if os.path.isfile(os.path.join(template_dir, template_file)):
if plugin in self.hidden_plugins:
hidden_plugins[plugin] = conf
else:
enabled_plugins[plugin] = conf
2018-01-29 13:47:21 +01:00
return render_template('index.html', plugins=enabled_plugins, hidden_plugins=hidden_plugins,
2018-01-29 13:47:21 +01:00
token=self.token, websocket_port=self.websocket_port)
@app.route('/widget/<widget>', methods=['POST'])
def widget_update(widget):
event = WidgetUpdateEvent(
widget=widget, **(json.loads(http_request.data.decode('utf-8'))))
self.redis.rpush(self.redis_queue, event)
return jsonify({ 'status': 'ok' })
2018-01-29 13:47:21 +01:00
@app.route('/static/<path>')
def static_path(path):
return send_from_directory(static_dir, filename)
@app.route('/dashboard')
def dashboard():
return render_template('dashboard.html', config=self.dashboard, utils=HttpUtils,
token=self.token, websocket_port=self.websocket_port)
2018-01-29 13:47:21 +01:00
return app
def websocket(self):
2018-01-29 16:34:00 +01:00
import websockets
2018-01-29 13:47:21 +01:00
async def register_websocket(websocket, path):
logging.info('New websocket connection from {}'.format(websocket.remote_address[0]))
2018-02-09 20:54:02 +01:00
websocket.remaining_ping_tries = self.websocket_ping_tries
2018-01-29 13:47:21 +01:00
self.active_websockets.add(websocket)
while True:
try:
waiter = await websocket.ping()
await asyncio.wait_for(waiter, timeout=self.websocket_ping_timeout)
time.sleep(self.websocket_ping_timeout)
2018-01-29 13:47:21 +01:00
except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed) as e:
2018-02-09 20:54:02 +01:00
close = False
if isinstance(e, asyncio.TimeoutError):
websocket.remaining_ping_tries -= 1
if websocket.remaining_ping_tries <= 0:
close = True
else:
close = True
if close:
logging.info('Websocket client {} closed connection'
.format(websocket.remote_address[0]))
self.active_websockets.remove(websocket)
break
2018-01-29 13:47:21 +01:00
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
2018-01-29 14:46:31 +01:00
websockets.serve(register_websocket, '0.0.0.0', self.websocket_port))
2018-01-29 13:47:21 +01:00
loop.run_forever()
def run(self):
super().run()
2018-01-04 17:20:35 +01:00
logging.info('Initialized HTTP backend on port {}'.format(self.port))
2018-01-29 13:47:21 +01:00
webserver = self.webserver()
self.server_proc = Process(target=webserver.run, kwargs={
'debug':True, 'host':'0.0.0.0', 'port':self.port, 'use_reloader':False
2018-01-04 17:20:35 +01:00
})
time.sleep(1)
2018-01-04 02:45:23 +01:00
self.server_proc.start()
2018-01-29 16:34:00 +01:00
if not self.disable_websocket:
self.websocket_thread = Thread(target=self.websocket)
self.websocket_thread.start()
2018-01-04 02:45:23 +01:00
self.server_proc.join()
class HttpUtils(object):
@staticmethod
def widget_columns_to_html_class(columns):
if not isinstance(columns, int):
raise RuntimeError('columns should be a number, got "{}"'.format(columns))
if columns == 1:
return 'one column'
elif columns == 2:
return 'two columns'
elif columns == 3:
return 'three columns'
elif columns == 4:
return 'four columns'
elif columns == 5:
return 'five columns'
elif columns == 6:
return 'six columns'
elif columns == 7:
return 'seven columns'
elif columns == 8:
return 'eight columns'
elif columns == 9:
return 'nine columns'
elif columns == 10:
return 'ten columns'
elif columns == 11:
return 'eleven columns'
elif columns == 12:
return 'twelve columns'
else:
raise RuntimeError('Constraint violation: should be 1 <= columns <= 12, ' +
'got columns={}'.format(columns))
2018-05-05 04:37:22 +02:00
@staticmethod
def search_directory(directory, *extensions, recursive=False):
files = []
if recursive:
for root, subdirs, files in os.walk(directory):
for file in files:
if not extensions or os.path.splitext(file)[1].lower() in extensions:
files.append(os.path.join(root, file))
else:
for file in os.listdir(directory):
if not extensions or os.path.splitext(file)[1].lower() in extensions:
files.append(os.path.join(directory, file))
return files
@classmethod
def search_web_directory(cls, directory, *extensions):
directory = re.sub('^/+', '', directory)
basedir = os.path.dirname(inspect.getfile(cls))
results = cls.search_directory(os.path.join(basedir, directory), *extensions)
return [item[len(basedir):] for item in results]
2018-01-04 02:45:23 +01:00
# vim:sw=4:ts=4:et: