- Proper expansion of the context variables on functional procedure call

- Expanded and refactored tests framework

- Added test_procedure
This commit is contained in:
Fabio Manganiello 2021-02-27 15:01:25 +01:00
parent 9e00428568
commit b4f9472fc5
14 changed files with 257 additions and 126 deletions

2
.gitignore vendored
View file

@ -18,3 +18,5 @@ platypush/notebooks
platypush/requests
/http-client.env.json
/platypush/backend/http/static/css/dist
/tests/etc/scripts
/tests/etc/dashboards

View file

@ -437,7 +437,7 @@ platyvdock rm device_id
## Tests
To run the tests simply run `pytest` from the source root folder - no further configuration required.
To run the tests run the `run_tests.sh` from the source root folder - no further configuration is required.
---

View file

@ -24,7 +24,7 @@ from .utils import set_thread_name
__author__ = 'Fabio Manganiello <info@fabiomanganiello.com>'
__version__ = '0.13.9'
__version__ = '0.20.0'
logger = logging.getLogger('platypush')
@ -186,8 +186,6 @@ class Daemon:
finally:
self.stop_app()
sys.exit(0)
def main():
"""

View file

@ -356,7 +356,12 @@ class Backend(Thread, EventGenerator):
Unregister the Zeroconf service configuration if available.
"""
if self.zeroconf and self.zeroconf_info:
try:
self.zeroconf.unregister_service(self.zeroconf_info)
except Exception as e:
self.logger.warning('Could not register Zeroconf service {}: {}: {}'.format(
self.zeroconf_info.name, type(e).__name__, str(e)))
if self.zeroconf:
self.zeroconf.close()

View file

@ -279,7 +279,7 @@ class HttpBackend(Backend):
self.server_proc.kill()
self.server_proc.wait(timeout=10)
if self.server_proc.poll() is not None:
self.logger.warning('HTTP server process still alive at termination')
self.logger.info('HTTP server process may be still alive at termination')
else:
self.logger.info('HTTP server process terminated')
else:
@ -288,7 +288,7 @@ class HttpBackend(Backend):
if self.server_proc.is_alive():
self.server_proc.kill()
if self.server_proc.is_alive():
self.logger.warning('HTTP server process still alive at termination')
self.logger.info('HTTP server process may be still alive at termination')
else:
self.logger.info('HTTP server process terminated')

View file

@ -77,7 +77,7 @@ class Request(Message):
proc_config = procedures[proc_name]
if is_functional_procedure(proc_config):
kwargs.update(**self.args)
kwargs = {**self.args, **kwargs}
if 'n_tries' in kwargs:
del kwargs['n_tries']

View file

@ -11,7 +11,7 @@ do
if [[ $test_ret != 0 ]]; then
tests_ret=$test_ret
echo "-------------" >&2
echo "Test FAILED: $testcase" >&2
echo "FAILED: $testcase" >&2
echo "-------------" >&2
fi
done

View file

@ -0,0 +1,138 @@
import abc
import logging
import os
import requests
import sys
import time
import unittest
from threading import Thread
from typing import Optional
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
test_dir = os.path.abspath(os.path.dirname(__file__))
conf_dir = os.path.join(test_dir, 'etc')
sys.path.insert(0, os.path.abspath(os.path.join(test_dir, '..')))
from platypush import Daemon, Config, Response
from platypush.message import Message
from platypush.utils import set_timeout, clear_timeout
class TimeoutException(RuntimeError):
def __init__(self, msg):
self.msg = msg
class BaseTest(unittest.TestCase, abc.ABC):
"""
Base class for Platypush tests.
"""
app_start_timeout = 5
request_timeout = 10
config_file = None
db_file = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.app: Optional[Daemon] = None
def setUp(self) -> None:
self.start_daemon()
def tearDown(self):
try:
self.stop_daemon()
finally:
if self.db_file and os.path.isfile(self.db_file):
logging.info('Removing temporary db file {}'.format(self.db_file))
os.unlink(self.db_file)
def start_daemon(self):
logging.info('Starting platypush service')
self.app = Daemon(config_file=self.config_file)
Thread(target=lambda: self.app.run()).start()
logging.info('Sleeping {} seconds while waiting for the daemon to start up'.format(self.app_start_timeout))
time.sleep(self.app_start_timeout)
def stop_daemon(self):
if self.app:
logging.info('Stopping platypush service')
self.app.stop_app()
@staticmethod
def parse_response(response):
response = Message.build(response.json())
assert isinstance(response, Response), 'Expected Response type, got {}'.format(response.__class__.__name__)
return response
@staticmethod
def on_timeout(msg):
def _f(): raise TimeoutException(msg)
return _f
class BaseHttpTest(BaseTest, abc.ABC):
"""
Base class for Platypush HTTP tests.
"""
base_url = None
test_user = 'platypush'
test_pass = 'test'
def setUp(self) -> None:
Config.init(self.config_file)
backends = Config.get_backends()
self.assertTrue('http' in backends, 'Missing HTTP server configuration')
self.base_url = 'http://localhost:{port}'.format(port=backends['http']['port'])
self.db_file = Config.get('main.db')['engine'][len('sqlite:///'):]
super().setUp()
def register_user(self, username: Optional[str] = None, password: Optional[str] = None):
if not username:
username = self.test_user
password = self.test_pass
set_timeout(seconds=self.request_timeout, on_timeout=self.on_timeout('User registration response timed out'))
response = requests.post('{base_url}/register?redirect={base_url}/'.format(base_url=self.base_url), data={
'username': username,
'password': password,
'confirm_password': password,
})
clear_timeout()
return response
def send_request(self, action: str, timeout: Optional[float] = None, args: Optional[dict] = None,
parse_response: bool = True, authenticate: bool = True, **kwargs):
if not timeout:
timeout = self.request_timeout
if not args:
args = {}
auth = (self.test_user, self.test_pass) if authenticate else kwargs.pop('auth', ())
set_timeout(seconds=timeout, on_timeout=self.on_timeout('Receiver response timed out'))
response = requests.post(
'{}/execute'.format(self.base_url),
auth=auth,
json={
'type': 'request',
'action': action,
'args': args,
}, **kwargs
)
clear_timeout()
if parse_response:
response = self.parse_response(response)
return response
def assertEqual(self, first, second, msg=..., expected=None, actual=None) -> None:
if expected is not None and actual is not None:
if not msg:
msg = ''
msg += '\n\tExpected: {expected}\n\tActual: {actual}'.format(expected=expected, actual=actual)
super().assertEqual(first, second, msg)

View file

@ -1,18 +0,0 @@
import os
import sys
from platypush.config import Config
testdir = os.path.dirname(__file__)
sys.path.insert(0, os.path.abspath(os.path.join(testdir, '..')))
config_file = os.path.join(testdir, 'etc', 'config.yaml')
Config.init(config_file)
class TimeoutException(RuntimeError):
def __init__(self, msg):
self.msg = msg
# vim:sw=4:ts=4:et:

View file

@ -1,5 +1,5 @@
main.db:
engine: sqlite:////tmp/platypush-tests.db
engine: sqlite:////tmp/platypush-test-http.db
backend.http:
port: 8123

View file

@ -0,0 +1,22 @@
main.db:
engine: sqlite:////tmp/platypush-test-procedure.db
backend.http:
port: 8124
disable_websocket: True
procedure.write_file:
- action: file.write
args:
file: ${file}
content: ${content}
event.hook.OnCustomEvent:
if:
type: platypush.message.event.custom.CustomEvent
subtype: platypush_test_procedure_from_event
then:
- action: procedure.write_file
args:
file: ${file}
content: ${content}

View file

@ -1,12 +1,12 @@
import logging
import unittest
from .context import config_file
from platypush.event.hook import EventCondition
from platypush.message.event.ping import PingEvent
from . import BaseTest
class TestEventParse(unittest.TestCase):
class TestEventParse(BaseTest):
def setUp(self):
self.condition = EventCondition.build({
'type': 'platypush.message.event.ping.PingEvent',
@ -14,7 +14,6 @@ class TestEventParse(unittest.TestCase):
})
def test_event_parse(self):
logging.info('Starting test on configuration file: {}'.format(config_file))
message = "GARBAGE GARBAGE this is the answer: 42"
event = PingEvent(message=message)
result = event.matches_condition(self.condition)

View file

@ -1,53 +1,29 @@
import os
from .context import config_file, TimeoutException
import logging
import requests
import sys
import time
import unittest
from threading import Thread, Event
from platypush import Daemon
from platypush.config import Config
from platypush.message import Message
from platypush.message.response import Response
from platypush.utils import set_timeout, clear_timeout
from . import BaseHttpTest, conf_dir
class TestHttp(unittest.TestCase):
""" Tests the full flow of a request/response on the HTTP backend.
Runs a remote command over HTTP via shell.exec plugin and gets the output """
class TestHttp(BaseHttpTest):
"""
Tests the full flow of a request/response on the HTTP backend.
Runs a remote command over HTTP via shell.exec plugin and gets the output.
"""
timeout = 10
sleep_secs = 5
db_file = '/tmp/platypush-tests.db'
test_user = 'platypush'
test_pass = 'test'
base_url = 'http://localhost:8123'
expected_registration_redirect = '{base_url}/register?redirect={base_url}/execute'.format(base_url=base_url)
expected_login_redirect = '{base_url}/login?redirect={base_url}/execute'.format(base_url=base_url)
config_file = os.path.join(conf_dir, 'test_http_config.yaml')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.app = None
self._app_started = Event()
def setUp(self):
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
backends = Config.get_backends()
self.assertTrue('http' in backends, 'Missing HTTP server configuration')
self.start_daemon()
logging.info('Sleeping {} seconds while waiting for the daemon to start up'.format(self.sleep_secs))
time.sleep(self.sleep_secs)
super(TestHttp, self).__init__(*args, **kwargs)
def test_http_flow(self):
# An /execute request performed before any user is registered should redirect to the registration page.
response = self.send_request()
self.assertEqual(self.expected_registration_redirect, response.url,
expected_registration_redirect = '{base_url}/register?redirect={base_url}/execute'.format(
base_url=self.base_url)
expected_login_redirect = '{base_url}/login?redirect={base_url}/execute'.format(
base_url=self.base_url)
response = self.send_request(authenticate=False, parse_response=False)
self.assertEqual(expected_registration_redirect, response.url,
'No users registered, but the application did not redirect us to the registration page')
# Emulate a first user registration through form and get the session_token.
@ -58,69 +34,20 @@ class TestHttp(unittest.TestCase):
'The registration form did not redirect to the main panel')
# After a first user has been registered any unauthenticated call to /execute should redirect to /login.
response = self.send_request()
self.assertEqual(self.expected_login_redirect, response.url,
response = self.send_request(authenticate=False, parse_response=False)
self.assertEqual(expected_login_redirect, response.url,
'An unauthenticated request after user registration should result in a login redirect')
# A request authenticated with user/pass should succeed.
response = self.parse_response(self.send_request(auth=(self.test_user, self.test_pass)))
self.assertEqual(response.__class__, Response, 'The request did not return a proper Response object')
response = self.send_request(authenticate=True)
self.assertEqual(response.output.strip(), 'ping', 'The request did not return the expected output')
# A request with the wrong user/pass should fail.
response = self.send_request(auth=('wrong', 'wrong'))
self.assertEqual(self.expected_login_redirect, response.url, 'A request with wrong credentials should fail')
def start_daemon(self):
self.app = Daemon(config_file=config_file)
Thread(target=lambda: self.app.run()).start()
def stop_daemon(self):
if self.app:
self.app.stop_app()
@staticmethod
def on_timeout(msg):
def _f(): raise TimeoutException(msg)
return _f
response = self.send_request(authenticate=False, auth=('wrong', 'wrong'), parse_response=False)
self.assertEqual(expected_login_redirect, response.url, 'A request with wrong credentials should fail')
def send_request(self, **kwargs):
set_timeout(seconds=self.timeout, on_timeout=self.on_timeout('Receiver response timed out'))
response = requests.post(
'{}/execute'.format(self.base_url),
json={
'type': 'request',
'target': Config.get('device_id'),
'action': 'shell.exec',
'args': {'cmd': 'echo ping'}
}, **kwargs
)
clear_timeout()
return response
def register_user(self):
set_timeout(seconds=self.timeout, on_timeout=self.on_timeout('User registration response timed out'))
response = requests.post('{base_url}/register?redirect={base_url}/'.format(base_url=self.base_url), data={
'username': self.test_user,
'password': self.test_pass,
'confirm_password': self.test_pass,
})
clear_timeout()
return response
@staticmethod
def parse_response(response):
return Message.build(response.json())
def tearDown(self):
try:
self.stop_daemon()
finally:
if os.path.isfile(self.db_file):
os.unlink(self.db_file)
return super().send_request('shell.exec', args={'cmd': 'echo ping'}, **kwargs)
if __name__ == '__main__':

58
tests/test_procedure.py Normal file
View file

@ -0,0 +1,58 @@
import os
import tempfile
import time
import unittest
from platypush.message.event.custom import CustomEvent
from . import BaseHttpTest, conf_dir
class TestProcedure(BaseHttpTest):
"""
Test the execution of configured procedures.
"""
config_file = os.path.join(conf_dir, 'test_procedure_config.yaml')
def setUp(self) -> None:
super().setUp()
self.register_user()
self.tmp_file = tempfile.NamedTemporaryFile(prefix='platypush-test-procedure-', suffix='.txt', delete=False)
def tearDown(self):
if os.path.isfile(self.tmp_file.name):
os.unlink(self.tmp_file.name)
super().tearDown()
def check_file_content(self, expected_content: str):
self.assertTrue(os.path.isfile(self.tmp_file.name), 'The expected output file was not created')
with open(self.tmp_file.name, 'r') as f:
content = f.read()
self.assertEqual(content, expected_content, 'The output file did not contain the expected text',
expected=expected_content, actual=content)
def test_procedure_call(self):
output_text = 'Procedure test'
self.send_request(
action='procedure.write_file',
args={
'file': self.tmp_file.name,
'content': output_text,
})
self.check_file_content(expected_content=output_text)
def test_procedure_from_event(self):
output_text = 'Procedure from event test'
event_type = 'platypush_test_procedure_from_event'
self.app.bus.post(CustomEvent(subtype=event_type, file=self.tmp_file.name, content=output_text))
time.sleep(3)
self.check_file_content(output_text)
if __name__ == '__main__':
unittest.main()
# vim:sw=4:ts=4:et: