80 lines
2.3 KiB
Python
80 lines
2.3 KiB
Python
from .context import platypush, config_file, TestTimeoutException
|
|
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
import unittest
|
|
|
|
from threading import Thread
|
|
|
|
from platypush import Daemon
|
|
from platypush.config import Config
|
|
from platypush.pusher import Pusher
|
|
from platypush.utils import set_timeout, clear_timeout
|
|
|
|
class TestLocal(unittest.TestCase):
|
|
""" Tests the full flow on a local backend by executing a command through
|
|
the shell.exec plugin and getting the output """
|
|
|
|
timeout = 5
|
|
|
|
def setUp(self):
|
|
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
|
|
backends = Config.get_backends()
|
|
self.assertTrue('local' in backends)
|
|
|
|
try: os.remove(Config.get_backends()['local']['request_fifo'])
|
|
except FileNotFoundError as e: pass
|
|
|
|
try: os.remove(Config.get_backends()['local']['response_fifo'])
|
|
except FileNotFoundError as e: pass
|
|
|
|
|
|
def test_local_shell_exec_flow(self):
|
|
self.start_sender()
|
|
self.start_receiver()
|
|
|
|
def on_response(self):
|
|
def _f(response):
|
|
logging.info("Received response: {}".format(response))
|
|
clear_timeout()
|
|
self.assertEqual(response.output.strip(), 'ping')
|
|
return _f
|
|
|
|
def on_timeout(self, msg):
|
|
def _f(): raise TestTimeoutException(msg)
|
|
return _f
|
|
|
|
def start_sender(self):
|
|
def _run_sender():
|
|
pusher = Pusher(config_file=config_file, backend='local',
|
|
on_response=self.on_response())
|
|
|
|
logging.info('Sending request')
|
|
pusher.push(target=Config.get('device_id'), action='shell.exec',
|
|
cmd='echo ping', timeout=None)
|
|
|
|
|
|
# Start the sender thread and wait for a response
|
|
set_timeout(seconds=self.timeout,
|
|
on_timeout=self.on_timeout('Receiver response timed out'))
|
|
|
|
self.sender = Thread(target=_run_sender)
|
|
self.sender.start()
|
|
|
|
def start_receiver(self):
|
|
set_timeout(seconds=self.timeout,
|
|
on_timeout=self.on_timeout('Sender request timed out'))
|
|
|
|
self.receiver = Daemon(config_file=config_file, requests_to_process=1)
|
|
self.receiver.start()
|
|
|
|
time.sleep(1)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
|
|
# vim:sw=4:ts=4:et:
|
|
|