Added proper support for ESP file upload/download (closes #110)

This commit is contained in:
Fabio Manganiello 2020-01-20 11:47:10 +01:00
parent b7d9917d1d
commit 4b56431e2a
4 changed files with 265 additions and 60 deletions

View file

@ -121,6 +121,10 @@ class EspPlugin(Plugin):
def on_message(self, conn: Connection): def on_message(self, conn: Connection):
def handler(ws, msg): def handler(ws, msg):
if not isinstance(msg, str):
# Bytes sequences will be handled by on_data
return
if msg.endswith('Password: ') and conn.state == conn.State.CONNECTED: if msg.endswith('Password: ') and conn.state == conn.State.CONNECTED:
conn.on_password_requested() conn.on_password_requested()
return return
@ -153,6 +157,34 @@ class EspPlugin(Plugin):
return callback return callback
def on_data(self, conn: Connection):
# noinspection PyUnusedLocal
def handler(ws, data):
if conn.state == conn.State.WAITING_FILE_TRANSFER_RESPONSE:
conn.on_recv_file_transfer_response(data)
return
if conn.state == conn.State.UPLOADING_FILE:
conn.on_file_transfer_completed(data)
return
if conn.state == conn.State.DOWNLOADING_FILE:
conn.on_chunk_received(data)
return
# noinspection PyUnusedLocal
def callback(ws, data, data_type, continue_flag):
import websocket
try:
if data_type == websocket.ABNF.OPCODE_BINARY:
handler(ws, data)
except Exception as e:
self.logger.exception(e)
raise e
return callback
def on_close(self, conn: Connection): def on_close(self, conn: Connection):
def callback(ws): def callback(ws):
# noinspection PyBroadException # noinspection PyBroadException
@ -186,6 +218,18 @@ class EspPlugin(Plugin):
return Device(host=host, port=port, password=password) return Device(host=host, port=port, password=password)
# noinspection PyUnusedLocal
def _get_connection(self, device: Optional[str] = None, host: Optional[str] = None, port: int = 8266, **kwargs) \
-> Connection:
if device:
assert device in self._devices_by_name, 'No such device configured: ' + device
device = self._devices_by_name[device]
host = device['host']
port = device['port']
assert host and port, 'No host and port specified'
return self._connections.get((host, port))
@action @action
def connect(self, device: Optional[str] = None, host: Optional[str] = None, port: int = 8266, def connect(self, device: Optional[str] = None, host: Optional[str] = None, port: int = 8266,
password: Optional[str] = None, timeout: Optional[float] = 10.0): password: Optional[str] = None, timeout: Optional[float] = 10.0):
@ -201,7 +245,9 @@ class EspPlugin(Plugin):
import websocket import websocket
device = self._get_device(device=device, host=host, port=port, password=password) device = self._get_device(device=device, host=host, port=port, password=password)
conn = self._connections.get((device['host'], device['port'])) host = device['host']
port = device['port']
conn = self._get_connection(host=host, port=port)
if conn and conn.ws and conn.ws.sock.connected: if conn and conn.ws and conn.ws.sock.connected:
self.logger.info('Already connected to {}:{}'.format(host, port)) self.logger.info('Already connected to {}:{}'.format(host, port))
return return
@ -212,6 +258,7 @@ class EspPlugin(Plugin):
ws = websocket.WebSocketApp('ws://{host}:{port}'.format(host=host, port=port), ws = websocket.WebSocketApp('ws://{host}:{port}'.format(host=host, port=port),
on_open=self.on_open(conn), on_open=self.on_open(conn),
on_message=self.on_message(conn), on_message=self.on_message(conn),
on_data=self.on_data(conn),
on_error=self.on_error(conn), on_error=self.on_error(conn),
on_close=self.on_close(conn)) on_close=self.on_close(conn))
@ -246,6 +293,7 @@ class EspPlugin(Plugin):
password: Optional[str] = None, password: Optional[str] = None,
conn_timeout: Optional[float] = 10.0, conn_timeout: Optional[float] = 10.0,
recv_timeout: Optional[float] = 30.0, recv_timeout: Optional[float] = 30.0,
wait_response: bool = True,
**kwargs) -> Response: **kwargs) -> Response:
""" """
Run raw Python code on the ESP device. Run raw Python code on the ESP device.
@ -257,6 +305,7 @@ class EspPlugin(Plugin):
:param password: ESP WebREPL password. :param password: ESP WebREPL password.
:param conn_timeout: Connection timeout (default: 10 seconds). :param conn_timeout: Connection timeout (default: 10 seconds).
:param recv_timeout: Response receive timeout (default: 30 seconds). :param recv_timeout: Response receive timeout (default: 30 seconds).
:param wait_response: Wait for the response from the device (default: True)
:return: The response returned by the Micropython interpreter, as a string. :return: The response returned by the Micropython interpreter, as a string.
""" """
device = self._get_device(device=device, host=host, port=port, password=password) device = self._get_device(device=device, host=host, port=port, password=password)
@ -264,7 +313,7 @@ class EspPlugin(Plugin):
conn = self._connections.get((device['host'], device['port'])) conn = self._connections.get((device['host'], device['port']))
try: try:
return conn.send(code, timeout=recv_timeout) return conn.send(code, timeout=recv_timeout, wait_response=wait_response)
except Exception as e: except Exception as e:
conn.close() conn.close()
raise e raise e
@ -855,7 +904,7 @@ machine.freq({freq})
import machine import machine
machine.reset() machine.reset()
''' '''
return self.execute(code, **kwargs).output return self.execute(code, wait_response=False, **kwargs).output
@action @action
def soft_reset(self, **kwargs): def soft_reset(self, **kwargs):
@ -867,7 +916,7 @@ machine.reset()
import machine import machine
machine.soft_reset() machine.soft_reset()
''' '''
return self.execute(code, **kwargs).output return self.execute(code, wait_response=False, **kwargs).output
@action @action
def disable_irq(self, **kwargs): def disable_irq(self, **kwargs):
@ -908,7 +957,7 @@ import time
time.sleep({sec}) time.sleep({sec})
'''.format(sec=seconds) '''.format(sec=seconds)
return self.execute(code, **kwargs).output return self.execute(code, wait_response=False, **kwargs).output
@action @action
def soft_sleep(self, seconds: Optional[float] = None, **kwargs): def soft_sleep(self, seconds: Optional[float] = None, **kwargs):
@ -925,7 +974,7 @@ import machine
machine.lightsleep({msec}) machine.lightsleep({msec})
'''.format(msec=int(seconds * 1000) if seconds else '') '''.format(msec=int(seconds * 1000) if seconds else '')
return self.execute(code, **kwargs).output return self.execute(code, wait_response=False, **kwargs).output
@action @action
def deep_sleep(self, seconds: Optional[float] = None, **kwargs): def deep_sleep(self, seconds: Optional[float] = None, **kwargs):
@ -942,7 +991,7 @@ import machine
machine.deepsleep({msec}) machine.deepsleep({msec})
'''.format(msec=int(seconds * 1000) if seconds else '') '''.format(msec=int(seconds * 1000) if seconds else '')
return self.execute(code, **kwargs).output return self.execute(code, wait_response=False, **kwargs).output
@action @action
def unique_id(self, **kwargs) -> str: def unique_id(self, **kwargs) -> str:
@ -1501,51 +1550,47 @@ with open('{file}', 'r') as f:
return self.execute(code, **kwargs).output return self.execute(code, **kwargs).output
@action @action
def file_upload(self, file: str, target: Optional[str] = None, **kwargs): def file_upload(self, source: str, destination: Optional[str] = None, timeout: Optional[float] = 60.0, **kwargs):
""" """
Upload a file to the board. Upload a file to the board.
NOTE: It only works with non-binary files.
:param file: Local file name/path to copy. :param source: Path of the local file to copy.
:param target: Target file name/path (default: a filename will be created under the board's :param destination: Target file name (default: a filename will be created under the board's
root folder with the same name as the source file). root folder with the same name as the source file).
:param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. :param timeout: File transfer timeout (default: one minute).
:param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.connect`.
""" """
file = os.path.abspath(os.path.expanduser(file)) device = self._get_device(**kwargs)
with open(file, 'r') as f: host = device['host']
content = f.read() port = device['port']
self.connect(host=host, port=port, password=device['password'])
if not target: conn = self._get_connection(host=host, port=port)
target = os.path.basename(file) conn.file_upload(source=source, destination=destination, timeout=timeout)
code = '''
content = """{content}"""
with open('{target}', 'w') as f:
f.write(content)
'''.format(content=content, target=target)
return self.execute(code, **kwargs).output
@action @action
def file_download(self, file: str, target: str, **kwargs): def file_download(self, source: str, destination: str, timeout: Optional[float] = 60.0, **kwargs):
""" """
Download a file from the board to the local machine. Download a file from the board to the local machine.
NOTE: It only works with non-binary files. NOTE: It only works with non-binary files.
:param file: File name/path to get from the device. :param source: Name or path of the file to download from the device.
:param target: Target directory or file path on the local machine. :param destination: Target directory or path on the local machine.
:param timeout: File transfer timeout (default: one minute).
:param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`.
""" """
target = os.path.abspath(os.path.expanduser(target)) destination = os.path.abspath(os.path.expanduser(destination))
if os.path.isdir(target): if os.path.isdir(destination):
filename = os.path.basename(file) filename = os.path.basename(source)
target = os.path.join(target, filename) destination = os.path.join(destination, filename)
# noinspection PyUnresolvedReferences device = self._get_device(**kwargs)
content = self.file_get(file, **kwargs).output host = device['host']
with open(target, 'w') as f: port = device['port']
f.write(content) self.connect(host=host, port=port, password=device['password'])
conn = self._get_connection(host=host, port=port)
with open(destination, 'wb') as f:
conn.file_download(source, f, timeout=timeout)
def _dht_get_value(self, pin: Union[int, str], dht_type: int, value: str, **kwargs) -> float: def _dht_get_value(self, pin: Union[int, str], dht_type: int, value: str, **kwargs) -> float:
device = self._get_device(**kwargs) device = self._get_device(**kwargs)

View file

@ -1,6 +1,9 @@
import enum import enum
import logging
import os
import re import re
import threading import threading
import websocket
from typing import Optional, Union from typing import Optional, Union
from platypush.utils import grouper from platypush.utils import grouper
@ -11,6 +14,8 @@ class Connection:
This class models the connection with an ESP8266/ESP32 device over its WebREPL websocket channel. This class models the connection with an ESP8266/ESP32 device over its WebREPL websocket channel.
""" """
_file_transfer_buffer_size = 1024
class State(enum.IntEnum): class State(enum.IntEnum):
DISCONNECTED = 1 DISCONNECTED = 1
CONNECTED = 2 CONNECTED = 2
@ -19,9 +24,17 @@ class Connection:
SENDING_REQUEST = 5 SENDING_REQUEST = 5
WAITING_ECHO = 6 WAITING_ECHO = 6
WAITING_RESPONSE = 7 WAITING_RESPONSE = 7
SENDING_FILE_TRANSFER_RESPONSE = 8
WAITING_FILE_TRANSFER_RESPONSE = 9
UPLOADING_FILE = 10
DOWNLOADING_FILE = 11
class FileRequestType(enum.IntEnum):
UPLOAD = 1
DOWNLOAD = 2
def __init__(self, host: str, port: int, connect_timeout: Optional[float] = None, def __init__(self, host: str, port: int, connect_timeout: Optional[float] = None,
password: Optional[str] = None, ws=None): password: Optional[str] = None, ws: Optional[websocket.WebSocketApp] = None):
self.host = host self.host = host
self.port = port self.port = port
self.connect_timeout = connect_timeout self.connect_timeout = connect_timeout
@ -32,11 +45,18 @@ class Connection:
self._logged_in = threading.Event() self._logged_in = threading.Event()
self._echo_received = threading.Event() self._echo_received = threading.Event()
self._response_received = threading.Event() self._response_received = threading.Event()
self._download_chunk_ready = threading.Event()
self._file_transfer_request_ack_received = threading.Event()
self._file_transfer_ack_received = threading.Event()
self._file_transfer_request_successful = True
self._file_transfer_successful = True
self._downloaded_chunk = None
self._password_requested = False self._password_requested = False
self._running_cmd = None self._running_cmd = None
self._received_echo = None self._received_echo = None
self._received_response = None self._received_response = None
self._paste_header_received = False self._paste_header_received = False
self.logger = logging.getLogger(__name__)
def send(self, msg: Union[str, bytes], wait_response: bool = True, timeout: Optional[float] = None): def send(self, msg: Union[str, bytes], wait_response: bool = True, timeout: Optional[float] = None):
bufsize = 255 bufsize = 255
@ -59,13 +79,15 @@ class Connection:
self.state = self.State.SENDING_REQUEST self.state = self.State.SENDING_REQUEST
self._running_cmd = msg.decode().strip() self._running_cmd = msg.decode().strip()
self._received_echo = '' self._received_echo = ''
self._response_received.clear() self._response_received.clear()
self._echo_received.clear() self._echo_received.clear()
for chunk in grouper(bufsize, msg): for chunk in grouper(bufsize, msg):
self.ws.send(bytes(chunk)) self.ws.send(bytes(chunk))
if wait_response: if not wait_response:
return
self.state = self.State.WAITING_ECHO self.state = self.State.WAITING_ECHO
echo_received = self._echo_received.wait(timeout=timeout) echo_received = self._echo_received.wait(timeout=timeout)
if not echo_received: if not echo_received:
@ -164,6 +186,66 @@ class Connection:
# Notify the listeners # Notify the listeners
self._response_received.set() self._response_received.set()
def on_recv_file_transfer_response(self, response):
self._file_transfer_request_successful = self._parse_file_transfer_response(response)
self._file_transfer_request_ack_received.set()
def on_file_transfer_completed(self, response):
self._file_transfer_successful = self._parse_file_transfer_response(response)
self._file_transfer_ack_received.set()
def on_file_upload_start(self):
self.logger.info('Starting file upload')
self._file_transfer_successful = False
self._file_transfer_ack_received.clear()
self.state = self.State.UPLOADING_FILE
def on_file_download_start(self):
self.logger.info('Starting file download')
self._file_transfer_successful = False
self._downloaded_chunk = None
self.state = self.State.DOWNLOADING_FILE
self._file_transfer_ack_received.clear()
self.ws.send(b'\x00', opcode=websocket.ABNF.OPCODE_BINARY)
def on_chunk_received(self, data):
size = data[0] | (data[1] << 8)
data = data[2:]
if len(data) != size:
return
self.logger.info('Received chunk of size {} (total size={})'.format(len(data), size))
if size == 0:
# End of file
self.logger.info('File download completed')
self._downloaded_chunk = None
self.on_file_download_completed()
else:
self._downloaded_chunk = data
self.ws.send(b'\x00', opcode=websocket.ABNF.OPCODE_BINARY)
self._download_chunk_ready.set()
self._download_chunk_ready.clear()
def on_file_download_completed(self):
self.state = self.State.READY
def on_file_transfer_request(self):
self.logger.info('Sending file transfer request')
self._file_transfer_request_successful = False
self._file_transfer_request_ack_received.clear()
self.state = self.State.SENDING_FILE_TRANSFER_RESPONSE
def get_downloaded_chunks(self, timeout: Optional[float] = None):
block_ok = self._download_chunk_ready.wait(timeout=timeout)
while self._downloaded_chunk and block_ok:
yield self._downloaded_chunk
block_ok = self._download_chunk_ready.wait(timeout=timeout)
if not block_ok:
self.on_timeout('File download timed out')
def wait_ready(self): def wait_ready(self):
connected = self._connected.wait(timeout=self.connect_timeout) connected = self._connected.wait(timeout=self.connect_timeout)
if not connected: if not connected:
@ -173,5 +255,85 @@ class Connection:
if not logged_in: if not logged_in:
self.on_timeout('Log in timed out') self.on_timeout('Log in timed out')
def wait_file_request_ack_received(self, timeout):
self.state = self.State.WAITING_FILE_TRANSFER_RESPONSE
self._file_transfer_request_ack_received.wait(timeout=timeout)
assert self._file_transfer_request_successful, 'File transfer request failed'
self.logger.info('File transfer request acknowledged')
def wait_file_transfer_completed(self, timeout):
self._file_transfer_ack_received.wait(timeout)
assert self._file_transfer_successful, 'File transfer failed'
self.logger.info('File transfer completed')
@staticmethod
def _parse_file_transfer_response(response: bytes) -> bool:
if not response or len(response) < 4:
return False
if response[0] == ord('W') and response[1] == ord('B'):
return response[2] | response[3] << 8 == 0
return False
def _send_file_request(self, filename: str, request_type: FileRequestType, file_size: int = 0,
timeout: Optional[float] = None):
self.on_file_transfer_request()
# 2 + 1 + 1 + 8 + 4 + 2 + 64
request = bytearray(82)
# Protocol mode (file transfer) and request type (1=PUT, 2=GET)
request[0] = ord('W')
request[1] = ord('A')
request[2] = request_type.value
# File size encoding
request[12] = file_size & 0xff
request[13] = (file_size >> 8) & 0xff
request[14] = (file_size >> 16) & 0xff
request[15] = (file_size >> 24) & 0xff
# File name length encoding
request[16] = len(filename) & 0xff
request[17] = (len(filename) >> 8) & 0xff
# File name encoding
for i in range(0, min(64, len(filename))):
request[i+18] = ord(filename[i])
self.ws.send(request, opcode=websocket.ABNF.OPCODE_BINARY)
self.wait_file_request_ack_received(timeout=timeout)
def _upload_file(self, f, timeout):
self.on_file_upload_start()
content = f.read(self._file_transfer_buffer_size)
while content:
self.ws.send(content, opcode=websocket.ABNF.OPCODE_BINARY)
content = f.read(self._file_transfer_buffer_size)
self.wait_file_transfer_completed(timeout=timeout)
self.state = self.State.READY
def _download_file(self, f, timeout):
self.on_file_download_start()
for chunk in self.get_downloaded_chunks(timeout=timeout):
f.write(chunk)
self.on_file_download_completed()
def file_upload(self, source, destination, timeout):
source = os.path.abspath(os.path.expanduser(source))
destination = os.path.join(destination, os.path.basename(source)) if destination else os.path.basename(source)
size = os.path.getsize(source)
with open(source, 'rb') as f:
self._send_file_request(destination, self.FileRequestType.UPLOAD, file_size=size, timeout=timeout)
self._upload_file(f, timeout=timeout)
def file_download(self, file, fd, timeout=None):
self._send_file_request(file, request_type=self.FileRequestType.DOWNLOAD, timeout=timeout)
self._download_file(fd, timeout=timeout)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -132,7 +132,7 @@ paho-mqtt
# git+https://github.com/agonzalezro/python-opensubtitles # git+https://github.com/agonzalezro/python-opensubtitles
# webvtt-py # webvtt-py
# Mopidy backend # Multiple plugins and backends require the websocket client
websocket-client websocket-client
# mpv player plugin # mpv player plugin
@ -224,5 +224,3 @@ croniter
# Support for nmap integration # Support for nmap integration
# python-nmap # python-nmap
# Support for ESP8266/ESP32 Micropython integration
# websocket-client

View file

@ -144,6 +144,8 @@ setup(
'croniter', 'croniter',
'pyScss', 'pyScss',
'sqlalchemy', 'sqlalchemy',
'websockets',
'websocket-client',
], ],
extras_require={ extras_require={
@ -154,9 +156,9 @@ setup(
# Support for Pushbullet backend and plugin # Support for Pushbullet backend and plugin
'pushbullet': ['pushbullet.py @ https://github.com/rbrcsk/pushbullet.py'], 'pushbullet': ['pushbullet.py @ https://github.com/rbrcsk/pushbullet.py'],
# Support for HTTP backend # Support for HTTP backend
'http': ['flask', 'websockets', 'python-dateutil', 'tz', 'frozendict', 'bcrypt'], 'http': ['flask', 'python-dateutil', 'tz', 'frozendict', 'bcrypt'],
# Support for uWSGI HTTP backend # Support for uWSGI HTTP backend
'uwsgi': ['flask', 'websockets', 'python-dateutil', 'tz', 'frozendict', 'uwsgi', 'bcrypt'], 'uwsgi': ['flask', 'python-dateutil', 'tz', 'frozendict', 'uwsgi', 'bcrypt'],
# Support for database # Support for database
'db': ['sqlalchemy'], 'db': ['sqlalchemy'],
# Support for MQTT backends # Support for MQTT backends
@ -168,7 +170,7 @@ setup(
# Support for Philips Hue plugin # Support for Philips Hue plugin
'hue': ['phue'], 'hue': ['phue'],
# Support for MPD/Mopidy music server plugin and backend # Support for MPD/Mopidy music server plugin and backend
'mpd': ['python-mpd2', 'websocket-client'], 'mpd': ['python-mpd2'],
# Support for text2speech plugin # Support for text2speech plugin
'tts': ['mplayer'], 'tts': ['mplayer'],
# Support for Google text2speech plugin # Support for Google text2speech plugin
@ -277,7 +279,5 @@ setup(
'sys': ['py-cpuinfo', 'psutil'], 'sys': ['py-cpuinfo', 'psutil'],
# Support for nmap integration # Support for nmap integration
'nmap': ['python-nmap'], 'nmap': ['python-nmap'],
# Support for ESP8266/ESP32 Micropython integration
'esp': ['websocket-client'],
}, },
) )