diff --git a/platypush/plugins/esp/__init__.py b/platypush/plugins/esp/__init__.py index fc1edab7..fe646fe6 100644 --- a/platypush/plugins/esp/__init__.py +++ b/platypush/plugins/esp/__init__.py @@ -121,6 +121,10 @@ class EspPlugin(Plugin): def on_message(self, conn: Connection): def handler(ws, msg): + if not isinstance(msg, str): + # Bytes sequences will be handled by on_data + return + if msg.endswith('Password: ') and conn.state == conn.State.CONNECTED: conn.on_password_requested() return @@ -153,6 +157,34 @@ class EspPlugin(Plugin): return callback + def on_data(self, conn: Connection): + # noinspection PyUnusedLocal + def handler(ws, data): + if conn.state == conn.State.WAITING_FILE_TRANSFER_RESPONSE: + conn.on_recv_file_transfer_response(data) + return + + if conn.state == conn.State.UPLOADING_FILE: + conn.on_file_transfer_completed(data) + return + + if conn.state == conn.State.DOWNLOADING_FILE: + conn.on_chunk_received(data) + return + + # noinspection PyUnusedLocal + def callback(ws, data, data_type, continue_flag): + import websocket + + try: + if data_type == websocket.ABNF.OPCODE_BINARY: + handler(ws, data) + except Exception as e: + self.logger.exception(e) + raise e + + return callback + def on_close(self, conn: Connection): def callback(ws): # noinspection PyBroadException @@ -186,6 +218,18 @@ class EspPlugin(Plugin): return Device(host=host, port=port, password=password) + # noinspection PyUnusedLocal + def _get_connection(self, device: Optional[str] = None, host: Optional[str] = None, port: int = 8266, **kwargs) \ + -> Connection: + if device: + assert device in self._devices_by_name, 'No such device configured: ' + device + device = self._devices_by_name[device] + host = device['host'] + port = device['port'] + + assert host and port, 'No host and port specified' + return self._connections.get((host, port)) + @action def connect(self, device: Optional[str] = None, host: Optional[str] = None, port: int = 8266, password: Optional[str] = None, timeout: Optional[float] = 10.0): @@ -201,7 +245,9 @@ class EspPlugin(Plugin): import websocket device = self._get_device(device=device, host=host, port=port, password=password) - conn = self._connections.get((device['host'], device['port'])) + host = device['host'] + port = device['port'] + conn = self._get_connection(host=host, port=port) if conn and conn.ws and conn.ws.sock.connected: self.logger.info('Already connected to {}:{}'.format(host, port)) return @@ -212,6 +258,7 @@ class EspPlugin(Plugin): ws = websocket.WebSocketApp('ws://{host}:{port}'.format(host=host, port=port), on_open=self.on_open(conn), on_message=self.on_message(conn), + on_data=self.on_data(conn), on_error=self.on_error(conn), on_close=self.on_close(conn)) @@ -246,6 +293,7 @@ class EspPlugin(Plugin): password: Optional[str] = None, conn_timeout: Optional[float] = 10.0, recv_timeout: Optional[float] = 30.0, + wait_response: bool = True, **kwargs) -> Response: """ Run raw Python code on the ESP device. @@ -257,6 +305,7 @@ class EspPlugin(Plugin): :param password: ESP WebREPL password. :param conn_timeout: Connection timeout (default: 10 seconds). :param recv_timeout: Response receive timeout (default: 30 seconds). + :param wait_response: Wait for the response from the device (default: True) :return: The response returned by the Micropython interpreter, as a string. """ device = self._get_device(device=device, host=host, port=port, password=password) @@ -264,7 +313,7 @@ class EspPlugin(Plugin): conn = self._connections.get((device['host'], device['port'])) try: - return conn.send(code, timeout=recv_timeout) + return conn.send(code, timeout=recv_timeout, wait_response=wait_response) except Exception as e: conn.close() raise e @@ -855,7 +904,7 @@ machine.freq({freq}) import machine machine.reset() ''' - return self.execute(code, **kwargs).output + return self.execute(code, wait_response=False, **kwargs).output @action def soft_reset(self, **kwargs): @@ -867,7 +916,7 @@ machine.reset() import machine machine.soft_reset() ''' - return self.execute(code, **kwargs).output + return self.execute(code, wait_response=False, **kwargs).output @action def disable_irq(self, **kwargs): @@ -908,7 +957,7 @@ import time time.sleep({sec}) '''.format(sec=seconds) - return self.execute(code, **kwargs).output + return self.execute(code, wait_response=False, **kwargs).output @action def soft_sleep(self, seconds: Optional[float] = None, **kwargs): @@ -925,7 +974,7 @@ import machine machine.lightsleep({msec}) '''.format(msec=int(seconds * 1000) if seconds else '') - return self.execute(code, **kwargs).output + return self.execute(code, wait_response=False, **kwargs).output @action def deep_sleep(self, seconds: Optional[float] = None, **kwargs): @@ -942,7 +991,7 @@ import machine machine.deepsleep({msec}) '''.format(msec=int(seconds * 1000) if seconds else '') - return self.execute(code, **kwargs).output + return self.execute(code, wait_response=False, **kwargs).output @action def unique_id(self, **kwargs) -> str: @@ -1501,51 +1550,47 @@ with open('{file}', 'r') as f: return self.execute(code, **kwargs).output @action - def file_upload(self, file: str, target: Optional[str] = None, **kwargs): + def file_upload(self, source: str, destination: Optional[str] = None, timeout: Optional[float] = 60.0, **kwargs): """ Upload a file to the board. - NOTE: It only works with non-binary files. - :param file: Local file name/path to copy. - :param target: Target file name/path (default: a filename will be created under the board's + :param source: Path of the local file to copy. + :param destination: Target file name (default: a filename will be created under the board's root folder with the same name as the source file). - :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. + :param timeout: File transfer timeout (default: one minute). + :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.connect`. """ - file = os.path.abspath(os.path.expanduser(file)) - with open(file, 'r') as f: - content = f.read() - - if not target: - target = os.path.basename(file) - - code = ''' -content = """{content}""" - -with open('{target}', 'w') as f: - f.write(content) -'''.format(content=content, target=target) - - return self.execute(code, **kwargs).output + device = self._get_device(**kwargs) + host = device['host'] + port = device['port'] + self.connect(host=host, port=port, password=device['password']) + conn = self._get_connection(host=host, port=port) + conn.file_upload(source=source, destination=destination, timeout=timeout) @action - def file_download(self, file: str, target: str, **kwargs): + def file_download(self, source: str, destination: str, timeout: Optional[float] = 60.0, **kwargs): """ Download a file from the board to the local machine. NOTE: It only works with non-binary files. - :param file: File name/path to get from the device. - :param target: Target directory or file path on the local machine. + :param source: Name or path of the file to download from the device. + :param destination: Target directory or path on the local machine. + :param timeout: File transfer timeout (default: one minute). :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ - target = os.path.abspath(os.path.expanduser(target)) - if os.path.isdir(target): - filename = os.path.basename(file) - target = os.path.join(target, filename) + destination = os.path.abspath(os.path.expanduser(destination)) + if os.path.isdir(destination): + filename = os.path.basename(source) + destination = os.path.join(destination, filename) - # noinspection PyUnresolvedReferences - content = self.file_get(file, **kwargs).output - with open(target, 'w') as f: - f.write(content) + device = self._get_device(**kwargs) + host = device['host'] + port = device['port'] + self.connect(host=host, port=port, password=device['password']) + conn = self._get_connection(host=host, port=port) + + with open(destination, 'wb') as f: + conn.file_download(source, f, timeout=timeout) def _dht_get_value(self, pin: Union[int, str], dht_type: int, value: str, **kwargs) -> float: device = self._get_device(**kwargs) diff --git a/platypush/plugins/esp/models/connection.py b/platypush/plugins/esp/models/connection.py index f7b85274..7fc225c0 100644 --- a/platypush/plugins/esp/models/connection.py +++ b/platypush/plugins/esp/models/connection.py @@ -1,6 +1,9 @@ import enum +import logging +import os import re import threading +import websocket from typing import Optional, Union from platypush.utils import grouper @@ -11,6 +14,8 @@ class Connection: This class models the connection with an ESP8266/ESP32 device over its WebREPL websocket channel. """ + _file_transfer_buffer_size = 1024 + class State(enum.IntEnum): DISCONNECTED = 1 CONNECTED = 2 @@ -19,9 +24,17 @@ class Connection: SENDING_REQUEST = 5 WAITING_ECHO = 6 WAITING_RESPONSE = 7 + SENDING_FILE_TRANSFER_RESPONSE = 8 + WAITING_FILE_TRANSFER_RESPONSE = 9 + UPLOADING_FILE = 10 + DOWNLOADING_FILE = 11 + + class FileRequestType(enum.IntEnum): + UPLOAD = 1 + DOWNLOAD = 2 def __init__(self, host: str, port: int, connect_timeout: Optional[float] = None, - password: Optional[str] = None, ws=None): + password: Optional[str] = None, ws: Optional[websocket.WebSocketApp] = None): self.host = host self.port = port self.connect_timeout = connect_timeout @@ -32,11 +45,18 @@ class Connection: self._logged_in = threading.Event() self._echo_received = threading.Event() self._response_received = threading.Event() + self._download_chunk_ready = threading.Event() + self._file_transfer_request_ack_received = threading.Event() + self._file_transfer_ack_received = threading.Event() + self._file_transfer_request_successful = True + self._file_transfer_successful = True + self._downloaded_chunk = None self._password_requested = False self._running_cmd = None self._received_echo = None self._received_response = None self._paste_header_received = False + self.logger = logging.getLogger(__name__) def send(self, msg: Union[str, bytes], wait_response: bool = True, timeout: Optional[float] = None): bufsize = 255 @@ -59,26 +79,28 @@ class Connection: self.state = self.State.SENDING_REQUEST self._running_cmd = msg.decode().strip() self._received_echo = '' + self._response_received.clear() - self._response_received.clear() self._echo_received.clear() for chunk in grouper(bufsize, msg): self.ws.send(bytes(chunk)) - if wait_response: - self.state = self.State.WAITING_ECHO - echo_received = self._echo_received.wait(timeout=timeout) - if not echo_received: - self.on_timeout('No response echo received') + if not wait_response: + return - self._paste_header_received = False - response_received = self._response_received.wait(timeout=timeout) - if not response_received: - self.on_timeout('No response received') + self.state = self.State.WAITING_ECHO + echo_received = self._echo_received.wait(timeout=timeout) + if not echo_received: + self.on_timeout('No response echo received') - response = self._received_response - self._received_response = None - return response + self._paste_header_received = False + response_received = self._response_received.wait(timeout=timeout) + if not response_received: + self.on_timeout('No response received') + + response = self._received_response + self._received_response = None + return response def on_connect(self): self.state = Connection.State.CONNECTED @@ -164,6 +186,66 @@ class Connection: # Notify the listeners self._response_received.set() + def on_recv_file_transfer_response(self, response): + self._file_transfer_request_successful = self._parse_file_transfer_response(response) + self._file_transfer_request_ack_received.set() + + def on_file_transfer_completed(self, response): + self._file_transfer_successful = self._parse_file_transfer_response(response) + self._file_transfer_ack_received.set() + + def on_file_upload_start(self): + self.logger.info('Starting file upload') + self._file_transfer_successful = False + self._file_transfer_ack_received.clear() + self.state = self.State.UPLOADING_FILE + + def on_file_download_start(self): + self.logger.info('Starting file download') + self._file_transfer_successful = False + self._downloaded_chunk = None + self.state = self.State.DOWNLOADING_FILE + self._file_transfer_ack_received.clear() + self.ws.send(b'\x00', opcode=websocket.ABNF.OPCODE_BINARY) + + def on_chunk_received(self, data): + size = data[0] | (data[1] << 8) + data = data[2:] + if len(data) != size: + return + + self.logger.info('Received chunk of size {} (total size={})'.format(len(data), size)) + if size == 0: + # End of file + self.logger.info('File download completed') + self._downloaded_chunk = None + self.on_file_download_completed() + else: + self._downloaded_chunk = data + self.ws.send(b'\x00', opcode=websocket.ABNF.OPCODE_BINARY) + + self._download_chunk_ready.set() + self._download_chunk_ready.clear() + + def on_file_download_completed(self): + self.state = self.State.READY + + def on_file_transfer_request(self): + self.logger.info('Sending file transfer request') + self._file_transfer_request_successful = False + self._file_transfer_request_ack_received.clear() + self.state = self.State.SENDING_FILE_TRANSFER_RESPONSE + + def get_downloaded_chunks(self, timeout: Optional[float] = None): + block_ok = self._download_chunk_ready.wait(timeout=timeout) + + while self._downloaded_chunk and block_ok: + yield self._downloaded_chunk + block_ok = self._download_chunk_ready.wait(timeout=timeout) + + if not block_ok: + self.on_timeout('File download timed out') + def wait_ready(self): connected = self._connected.wait(timeout=self.connect_timeout) if not connected: @@ -173,5 +255,85 @@ class Connection: if not logged_in: self.on_timeout('Log in timed out') + def wait_file_request_ack_received(self, timeout): + self.state = self.State.WAITING_FILE_TRANSFER_RESPONSE + self._file_transfer_request_ack_received.wait(timeout=timeout) + assert self._file_transfer_request_successful, 'File transfer request failed' + self.logger.info('File transfer request acknowledged') + + def wait_file_transfer_completed(self, timeout): + self._file_transfer_ack_received.wait(timeout) + assert self._file_transfer_successful, 'File transfer failed' + self.logger.info('File transfer completed') + + @staticmethod + def _parse_file_transfer_response(response: bytes) -> bool: + if not response or len(response) < 4: + return False + + if response[0] == ord('W') and response[1] == ord('B'): + return response[2] | response[3] << 8 == 0 + return False + + def _send_file_request(self, filename: str, request_type: FileRequestType, file_size: int = 0, + timeout: Optional[float] = None): + self.on_file_transfer_request() + + # 2 + 1 + 1 + 8 + 4 + 2 + 64 + request = bytearray(82) + + # Protocol mode (file transfer) and request type (1=PUT, 2=GET) + request[0] = ord('W') + request[1] = ord('A') + request[2] = request_type.value + + # File size encoding + request[12] = file_size & 0xff + request[13] = (file_size >> 8) & 0xff + request[14] = (file_size >> 16) & 0xff + request[15] = (file_size >> 24) & 0xff + + # File name length encoding + request[16] = len(filename) & 0xff + request[17] = (len(filename) >> 8) & 0xff + + # File name encoding + for i in range(0, min(64, len(filename))): + request[i+18] = ord(filename[i]) + + self.ws.send(request, opcode=websocket.ABNF.OPCODE_BINARY) + self.wait_file_request_ack_received(timeout=timeout) + + def _upload_file(self, f, timeout): + self.on_file_upload_start() + content = f.read(self._file_transfer_buffer_size) + + while content: + self.ws.send(content, opcode=websocket.ABNF.OPCODE_BINARY) + content = f.read(self._file_transfer_buffer_size) + + self.wait_file_transfer_completed(timeout=timeout) + self.state = self.State.READY + + def _download_file(self, f, timeout): + self.on_file_download_start() + for chunk in self.get_downloaded_chunks(timeout=timeout): + f.write(chunk) + + self.on_file_download_completed() + + def file_upload(self, source, destination, timeout): + source = os.path.abspath(os.path.expanduser(source)) + destination = os.path.join(destination, os.path.basename(source)) if destination else os.path.basename(source) + size = os.path.getsize(source) + + with open(source, 'rb') as f: + self._send_file_request(destination, self.FileRequestType.UPLOAD, file_size=size, timeout=timeout) + self._upload_file(f, timeout=timeout) + + def file_download(self, file, fd, timeout=None): + self._send_file_request(file, request_type=self.FileRequestType.DOWNLOAD, timeout=timeout) + self._download_file(fd, timeout=timeout) + # vim:sw=4:ts=4:et: diff --git a/requirements.txt b/requirements.txt index 206078ae..cdfce8c2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -132,7 +132,7 @@ paho-mqtt # git+https://github.com/agonzalezro/python-opensubtitles # webvtt-py -# Mopidy backend +# Multiple plugins and backends require the websocket client websocket-client # mpv player plugin @@ -224,5 +224,3 @@ croniter # Support for nmap integration # python-nmap -# Support for ESP8266/ESP32 Micropython integration -# websocket-client diff --git a/setup.py b/setup.py index 060752b2..7cfe1fce 100755 --- a/setup.py +++ b/setup.py @@ -144,6 +144,8 @@ setup( 'croniter', 'pyScss', 'sqlalchemy', + 'websockets', + 'websocket-client', ], extras_require={ @@ -154,9 +156,9 @@ setup( # Support for Pushbullet backend and plugin 'pushbullet': ['pushbullet.py @ https://github.com/rbrcsk/pushbullet.py'], # Support for HTTP backend - 'http': ['flask', 'websockets', 'python-dateutil', 'tz', 'frozendict', 'bcrypt'], + 'http': ['flask', 'python-dateutil', 'tz', 'frozendict', 'bcrypt'], # Support for uWSGI HTTP backend - 'uwsgi': ['flask', 'websockets', 'python-dateutil', 'tz', 'frozendict', 'uwsgi', 'bcrypt'], + 'uwsgi': ['flask', 'python-dateutil', 'tz', 'frozendict', 'uwsgi', 'bcrypt'], # Support for database 'db': ['sqlalchemy'], # Support for MQTT backends @@ -168,7 +170,7 @@ setup( # Support for Philips Hue plugin 'hue': ['phue'], # Support for MPD/Mopidy music server plugin and backend - 'mpd': ['python-mpd2', 'websocket-client'], + 'mpd': ['python-mpd2'], # Support for text2speech plugin 'tts': ['mplayer'], # Support for Google text2speech plugin @@ -277,7 +279,5 @@ setup( 'sys': ['py-cpuinfo', 'psutil'], # Support for nmap integration 'nmap': ['python-nmap'], - # Support for ESP8266/ESP32 Micropython integration - 'esp': ['websocket-client'], }, )