platypush/platypush/message/request/__init__.py

177 lines
6.0 KiB
Python
Raw Normal View History

import json
import logging
import random
import re
import traceback
from threading import Thread
from platypush.context import get_plugin
from platypush.message import Message
from platypush.message.response import Response
from platypush.utils import get_module_and_method_from_action
class Request(Message):
""" Request message class """
def __init__(self, target, action, origin=None, id=None, backend=None, args=None):
"""
Params:
target -- Target node [String]
action -- Action to be executed (e.g. music.mpd.play) [String]
origin -- Origin node [String]
id -- Message ID, or None to get it auto-generated
backend -- Backend connected to the request, where the response will be delivered
args -- Additional arguments for the action [Dict]
"""
self.id = id if id else self._generate_id()
self.target = target
self.action = action
self.origin = origin
self.args = args if args else {}
self.backend = backend
@classmethod
def build(cls, msg):
msg = super().parse(msg)
args = {
'target' : msg['target'],
'action' : msg['action'],
2017-12-18 01:10:51 +01:00
'args' : msg['args'] if 'args' in msg else {},
}
args['id'] = msg['id'] if 'id' in msg else cls._generate_id()
if 'origin' in msg: args['origin'] = msg['origin']
return cls(**args)
@staticmethod
def _generate_id():
id = ''
for i in range(0,16):
id += '%.2x' % random.randint(0, 255)
return id
2018-01-04 02:45:23 +01:00
def _execute_procedure(self, *args, **kwargs):
2018-01-04 16:11:54 +01:00
from platypush.config import Config
from platypush.procedure import Procedure
2018-01-04 02:45:23 +01:00
2018-01-04 16:11:54 +01:00
logging.info('Executing procedure request: {}'.format(self.action))
2018-01-04 02:45:23 +01:00
proc_name = self.action.split('.')[-1]
proc_config = Config.get_procedures()[proc_name]
proc = Procedure.build(name=proc_name, requests=proc_config['actions'],
async=proc_config['async'],
backend=self.backend, id=self.id)
return proc.execute(*args, **kwargs)
def _expand_context(self, **context):
args = {}
for (name, value) in self.args.items():
if isinstance(value, str):
parsed_value = ''
while value:
m = re.match('([^\\\]*)\$([\w\d_-]+)(.*)', value)
if m:
context_name = m.group(2)
value = m.group(3)
if context_name in context:
parsed_value += m.group(1) + context[context_name]
else:
parsed_value += m.group(1) + '$' + m.group(2)
else:
parsed_value += value
value = ''
value = parsed_value
args[name] = value
return args
2018-01-04 02:45:23 +01:00
def _send_response(self, response):
if self.backend and self.origin:
self.backend.send_response(response=response, request=self)
else:
logging.info('Response whose request has no ' +
'origin attached: {}'.format(response))
def execute(self, n_tries=1, async=True, **context):
"""
Execute this request and returns a Response object
Params:
n_tries -- Number of tries in case of failure before raising a RuntimeError
2018-01-04 02:45:23 +01:00
async -- If True, the request will be run asynchronously and the
response posted on the bus when available (default),
otherwise the current thread will wait for the response
to be returned synchronously.
context -- Key-valued context. Example:
context = (group_name='Kitchen lights')
request.args:
- group: $group_name # will be expanded as "Kitchen lights")
"""
2018-01-04 02:45:23 +01:00
def _thread_func(n_tries):
2018-01-04 02:45:23 +01:00
if self.action.startswith('procedure.'):
try:
response = self._execute_procedure(n_tries=n_tries)
finally:
self._send_response(response)
return response
else:
(module_name, method_name) = get_module_and_method_from_action(self.action)
plugin = get_plugin(module_name)
try:
# Run the action
args = self._expand_context(**context)
response = plugin.run(method=method_name, **args)
if response and response.is_error():
raise RuntimeError('Response processed with errors: {}'.format(response))
logging.info('Processed response from plugin {}: {}'.
format(plugin, response))
except Exception as e:
# Retry mechanism
response = Response(output=None, errors=[str(e), traceback.format_exc()])
logging.exception(e)
if n_tries:
logging.info('Reloading plugin {} and retrying'.format(module_name))
get_plugin(module_name, reload=True)
_thread_func(n_tries-1)
return
finally:
self._send_response(response)
return response
2018-01-04 02:45:23 +01:00
if async:
Thread(target=_thread_func, args=(n_tries,)).start()
else:
return _thread_func(n_tries)
def __str__(self):
"""
Overrides the str() operator and converts
the message into a UTF-8 JSON string
"""
return json.dumps({
'type' : 'request',
'target' : self.target,
'action' : self.action,
'args' : self.args,
'origin' : self.origin if hasattr(self, 'origin') else None,
'id' : self.id if hasattr(self, 'id') else None,
})
# vim:sw=4:ts=4:et: