diff --git a/platypush/message/__init__.py b/platypush/message/__init__.py index 66a99b388..65106da2b 100644 --- a/platypush/message/__init__.py +++ b/platypush/message/__init__.py @@ -30,6 +30,11 @@ class Message: """ class Encoder(json.JSONEncoder): + """ + JSON encoder that can serialize custom types commonly handled in + Platypush messages. + """ + @staticmethod def parse_numpy(obj): try: @@ -57,50 +62,50 @@ class Message: if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): return obj.isoformat() - def default(self, obj): + def default(self, o): from platypush.procedure import Procedure - value = self.parse_datetime(obj) + value = self.parse_datetime(o) if value is not None: return value - if isinstance(obj, set): - return list(obj) + if isinstance(o, set): + return list(o) - if isinstance(obj, UUID): - return str(obj) + if isinstance(o, UUID): + return str(o) - value = self.parse_numpy(obj) + value = self.parse_numpy(o) if value is not None: return value - if isinstance(obj, JSONAble): - return obj.to_json() + if isinstance(o, JSONAble): + return o.to_json() - if isinstance(obj, Procedure): - return obj.to_dict() + if isinstance(o, Procedure): + return o.to_dict() - if isinstance(obj, Enum): - return obj.value + if isinstance(o, Enum): + return o.value - if isinstance(obj, Exception): - return f'<{obj.__class__.__name__}>' + (f' {obj}' if obj else '') + if isinstance(o, Exception): + return f'<{o.__class__.__name__}>' + (f' {o}' if o else '') - if is_dataclass(obj): - return asdict(obj) + if is_dataclass(o): + return asdict(o) - if isinstance(obj, Message): - return obj.to_dict(obj) + if isinstance(o, Message): + return o.to_dict(o) # Don't serialize I/O wrappers/objects - if isinstance(obj, io.IOBase): + if isinstance(o, io.IOBase): return None try: - return super().default(obj) + return super().default(o) except Exception as e: _logger.warning( - 'Could not serialize object type %s: %s: %s', type(obj), e, obj + 'Could not serialize object type %s: %s: %s', type(o), e, o ) def __init__(self, *_, timestamp=None, logging_level=logging.INFO, **__): @@ -205,62 +210,4 @@ class Message: return msgtype.build(msg) -class Mapping(dict): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - for k, v in kwargs.items(): - self.__setattr__(k, v) - - def __setitem__(self, key, item): - self.__dict__[key] = item - - def __getitem__(self, key): - return self.__dict__[key] - - def __repr__(self): - return repr(self.__dict__) - - def __len__(self): - return len(self.__dict__) - - def __delitem__(self, key): - del self.__dict__[key] - - def clear(self): - return self.__dict__.clear() - - def copy(self): - return self.__dict__.copy() - - def has_key(self, k): - return k in self.__dict__ - - def update(self, *args, **kwargs): - return self.__dict__.update(*args, **kwargs) - - def keys(self): - return self.__dict__.keys() - - def values(self): - return self.__dict__.values() - - def items(self): - return self.__dict__.items() - - def pop(self, *args): - return self.__dict__.pop(*args) - - def __cmp__(self, dict_): - return self.__cmp__(dict_) - - def __contains__(self, item): - return item in self.__dict__ - - def __iter__(self): - return iter(self.__dict__) - - def __str__(self): - return str(self.__dict__) - - # vim:sw=4:ts=4:et: diff --git a/platypush/message/response/esp.py b/platypush/message/response/esp.py deleted file mode 100644 index 5ebde2544..000000000 --- a/platypush/message/response/esp.py +++ /dev/null @@ -1,50 +0,0 @@ -from typing import Optional - -from platypush.message import Mapping - - -class EspWifiScanResult(Mapping): - def __init__(self, - essid: str, - bssid: str, - channel: int, - rssi: int, - auth_mode: int, - hidden: bool, - *args, - **kwargs): - self.essid = essid - self.bssid = bssid - self.channel = channel - self.rssi = rssi - self.auth_mode = auth_mode - self.hidden = hidden - super().__init__(*args, **dict(self), **kwargs) - - -class EspWifiConfigResult(Mapping): - def __init__(self, - ip: str, - netmask: str, - gateway: str, - dns: str, - mac: str, - active: bool, - essid: Optional[str] = None, - channel: Optional[int] = None, - hidden: Optional[bool] = None, - *args, - **kwargs): - self.ip = ip - self.netmask = netmask - self.gateway = gateway - self.dns = dns - self.mac = mac - self.active = active - self.essid = essid - self.channel = channel - self.hidden = hidden - super().__init__(*args, **dict(self), **kwargs) - - -# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/esp/__init__.py b/platypush/plugins/esp/__init__.py index ba328eb0c..d0ffa0337 100644 --- a/platypush/plugins/esp/__init__.py +++ b/platypush/plugins/esp/__init__.py @@ -7,63 +7,63 @@ from typing import Dict, Optional, List, Any, Union import websocket from platypush import Response -from platypush.message.response.esp import EspWifiScanResult, EspWifiConfigResult from platypush.plugins import Plugin, action from platypush.plugins.esp.models.connection import Connection from platypush.plugins.esp.models.device import Device +from platypush.schemas.esp import WifiConfigSchema, WifiScanResultSchema class EspPlugin(Plugin): - # noinspection PyUnresolvedReferences """ - This plugin allows you to fully control to ESP8266/ESP32 devices connected over WiFi. - It uses the WebREPL interface embedded in MicroPython to communicate with the device. + This plugin allows you to fully control to ESP8266/ESP32 devices connected over WiFi. + It uses the WebREPL interface embedded in MicroPython to communicate with the device. - All you need to do is to flash the MicroPython firmware to your device, enable the WebREPL interface, - and you can use this plugin to fully control the device remotely without deploying any code to the controller. + All you need to do is to flash the MicroPython firmware to your device, enable the WebREPL interface, + and you can use this plugin to fully control the device remotely without deploying any code to the controller. - - Download the `MicroPython firmware `_ for your device. - - Connect your ESP8266/ESP32 via serial/USB port and - `flash the firmware `_. - For example. using ``esptool`` and assuming that you have an ESP8266 device connected on ``/dev/ttyUSB0``: + - Download the `MicroPython firmware `_ for your device. + - Connect your ESP8266/ESP32 via serial/USB port and + `flash the firmware `_. + For example. using ``esptool`` and assuming that you have an ESP8266 device connected on ``/dev/ttyUSB0``: - .. code-block:: shell + .. code-block:: shell - # Erase the flash memory - esptool.py --port /dev/ttyUSB0 erase_flash - # Flash the firmware - esptool.py --port /dev/ttyUSB0 --baud 115200 write_flash --flash_size=detect 0 esp8266-[version].bin + # Erase the flash memory + esptool.py --port /dev/ttyUSB0 erase_flash + # Flash the firmware + esptool.py --port /dev/ttyUSB0 --baud 115200 write_flash --flash_size=detect 0 esp8266-[version].bin - - Access the MicroPython interpreter over serial/USB port. For example, on Linux: + - Access the MicroPython interpreter over serial/USB port. For example, on Linux: - .. code-block:: shell + .. code-block:: shell - picocom /dev/ttyUSB0 -b11520 + picocom /dev/ttyUSB0 -b11520 - - Configure the WiFi interface: + - Configure the WiFi interface: - .. code-block:: python + .. code-block:: python - >>> import network - >>> wlan = network.WLAN(network.STA_IF) - >>> wlan.active(True) - >>> wlan.connect('YourSSID', 'YourPassword') - >>> # Print the device IP address - >>> wlan.ifconfig()[0] - >>> '192.168.1.23' + >>> import network + >>> wlan = network.WLAN(network.STA_IF) + >>> wlan.active(True) + >>> wlan.connect('YourSSID', 'YourPassword') + >>> # Print the device IP address + >>> wlan.ifconfig()[0] + >>> '192.168.1.23' - - Enable the`WebREPL `_ - interface on the device: + - Enable the`WebREPL + `_ + interface on the device: - .. code-block:: python + .. code-block:: python - >>> import webrepl_setup + >>> import webrepl_setup - - Follow the instructions, set a password and reset your device. A websocket service should be available - by default on the port 8266 of your ESP8266/ESP32 and it can accept commands sent by platypush. - """ + - Follow the instructions, set a password and reset your device. A websocket service should be available + by default on the port 8266 of your ESP8266/ESP32 and it can accept commands sent by platypush. + """ - def __init__(self, devices: List[Union[Device, dict]] = None, **kwargs): + def __init__(self, devices: Optional[List[Union[Device, dict]]] = None, **kwargs): """ :param devices: List of configured device. Pre-configuring devices by name allows you to call the actions directly by device name, instead of specifying ``host``, ``port`` and ``password`` on each call. It @@ -95,12 +95,11 @@ class EspPlugin(Plugin): super().__init__(**kwargs) self.devices = [ - dev if isinstance(dev, Device) else Device(**dev) - for dev in (devices or []) + dev if isinstance(dev, Device) else Device(**dev) for dev in (devices or []) ] - self._devices_by_host = {dev['host']: dev for dev in self.devices} - self._devices_by_name = {dev['name']: dev for dev in self.devices if dev['name']} + self._devices_by_host = {dev.host: dev for dev in self.devices} + self._devices_by_name = {dev.name: dev for dev in self.devices if dev.name} self._connections: Dict[tuple, Connection] = {} def __del__(self): @@ -112,7 +111,8 @@ class EspPlugin(Plugin): def on_open(self, conn: Connection): def callback(*_): conn.on_connect() - self.logger.info('Connection to {} opened'.format(conn.ws.url)) + if conn.ws: + self.logger.info('Connection to %s opened', conn.ws.url) return callback @@ -127,7 +127,10 @@ class EspPlugin(Plugin): conn.on_password_requested() return - if conn.state in (conn.State.CONNECTED, conn.state.PASSWORD_REQUIRED) and msg.endswith('>>> '): + if conn.state in ( + conn.State.CONNECTED, + conn.state.PASSWORD_REQUIRED, + ) and msg.endswith('>>> '): conn.on_ready() return @@ -144,7 +147,8 @@ class EspPlugin(Plugin): return - self.logger.debug('Message received on {}: {}'.format(conn.ws.url, msg)) + if conn.ws: + self.logger.debug('Message received on %s: %s', conn.ws.url, msg) def callback(ws, msg): try: @@ -156,8 +160,7 @@ class EspPlugin(Plugin): return callback def on_data(self, conn: Connection): - # noinspection PyUnusedLocal - def handler(ws, data): + def handler(_, data): if conn.state == conn.State.WAITING_FILE_TRANSFER_RESPONSE: conn.on_recv_file_transfer_response(data) return @@ -170,7 +173,6 @@ class EspPlugin(Plugin): conn.on_chunk_received(data) return - # noinspection PyUnusedLocal def callback(ws, data, data_type, *_): try: if data_type == websocket.ABNF.OPCODE_BINARY: @@ -184,12 +186,13 @@ class EspPlugin(Plugin): def on_close(self, conn: Connection): def callback(*_): try: - conn.ws.close() + if conn.ws: + conn.ws.close() except Exception as e: - self.logger.warning('Could not close connection: {}'.format(str(e))) + self.logger.warning('Could not close connection: %s: %s', type(e), e) conn.on_close() - self.logger.info('Connection to {}:{} closed'.format(conn.host, conn.port)) + self.logger.info('Connection to %s:%d closed', conn.host, conn.port) return callback @@ -197,38 +200,96 @@ class EspPlugin(Plugin): def callback(*args): err = args[1] if len(args) > 1 else args[0] conn.on_close() - self.logger.warning('Websocket connection error: {}'.format(err)) + self.logger.warning('Websocket connection error: %s', err) return callback - # noinspection PyUnusedLocal - def _get_device(self, device: Optional[str] = None, host: Optional[str] = None, port: int = 8266, - password: Optional[str] = None, **kwargs) -> Device: + def _get_device( + self, + device: Optional[str] = None, + host: Optional[str] = None, + port: int = 8266, + password: Optional[str] = None, + **_, + ) -> Device: if device: - assert device in self._devices_by_name, 'No such device configured: ' + device + assert device in self._devices_by_name, ( + 'No such device configured: ' + device + ) return self._devices_by_name[device] + host = host or self.devices[0].host assert host and port, 'No host and port specified' if host in self._devices_by_host: return self._devices_by_host[host] return Device(host=host, port=port, password=password) - # noinspection PyUnusedLocal - def _get_connection(self, device: Optional[str] = None, host: Optional[str] = None, port: int = 8266, **kwargs) \ - -> Connection: + def _get_connection( + self, + device: Optional[str] = None, + host: Optional[str] = None, + port: int = 8266, + **_, + ) -> Optional[Connection]: if device: - assert device in self._devices_by_name, 'No such device configured: ' + device - device = self._devices_by_name[device] - host = device['host'] - port = device['port'] + assert device in self._devices_by_name, ( + 'No such device configured: ' + device + ) + dev = self._devices_by_name[device] + host = dev.host + port = dev.port assert host and port, 'No host and port specified' - return self._connections.get((host, port)) + conn = self._connections.get((host, port)) + return conn + + def _connect( + self, + device: Optional[str] = None, + host: Optional[str] = None, + port: int = 8266, + password: Optional[str] = None, + timeout: Optional[float] = 10.0, + ) -> Connection: + dev = self._get_device(device=device, host=host, port=port, password=password) + host = dev.host + port = dev.port + conn = self._get_connection(host=host, port=port) + if conn and conn.ws and conn.ws.sock and conn.ws.sock.connected: + return conn + + assert host and port, 'No host and port specified' + conn = Connection( + host=host, port=port, password=password, connect_timeout=timeout + ) + self._connections[(host, port)] = conn + + ws = websocket.WebSocketApp( + f'ws://{host}:{port}', + on_open=self.on_open(conn), + on_message=self.on_message(conn), + on_data=self.on_data(conn), + on_error=self.on_error(conn), + on_close=self.on_close(conn), + ) + + conn.ws = ws + wst = threading.Thread(target=ws.run_forever) + wst.daemon = True + wst.start() + conn.wait_ready() + return conn @action - def connect(self, device: Optional[str] = None, host: Optional[str] = None, port: int = 8266, - password: Optional[str] = None, timeout: Optional[float] = 10.0): + def connect( + self, + device: Optional[str] = None, + host: Optional[str] = None, + port: int = 8266, + password: Optional[str] = None, + timeout: Optional[float] = 10.0, + ): """ Open a connection to an ESP device. @@ -238,57 +299,41 @@ class EspPlugin(Plugin): :param password: ESP WebREPL password. :param timeout: Connection timeout (default: 10 seconds). """ - device = self._get_device(device=device, host=host, port=port, password=password) - host = device['host'] - port = device['port'] - conn = self._get_connection(host=host, port=port) - if conn and conn.ws and conn.ws.sock.connected: - self.logger.info('Already connected to {}:{}'.format(host, port)) - return - - conn = Connection(host=host, port=port, password=password, connect_timeout=timeout) - self._connections[(host, port)] = conn - - ws = websocket.WebSocketApp('ws://{host}:{port}'.format(host=host, port=port), - on_open=self.on_open(conn), - on_message=self.on_message(conn), - on_data=self.on_data(conn), - on_error=self.on_error(conn), - on_close=self.on_close(conn)) - - conn.ws = ws - wst = threading.Thread(target=ws.run_forever) - wst.daemon = True - wst.start() - conn.wait_ready() + self._connect( + device=device, host=host, port=port, password=password, timeout=timeout + ) @action - def close(self, device: Optional[str] = None, host: Optional[str] = None, port: int = 8266): + def close( + self, device: Optional[str] = None, host: Optional[str] = None, port: int = 8266 + ): """ Close an active connection to a device. + :param device: Configured device name. Either ``device`` or ``host`` and ``port`` must be provided. :param host: ESP host. :param port: ESP port. """ - device = self._get_device(device=device, host=host, port=port) - host, port = [device['host'], device['port']] + dev = self._get_device(device=device, host=host, port=port) + host, port = [dev.host, dev.port] conn = self._connections.get((host, port)) - assert conn, 'No active connection found to {}:{}'.format(host, port) + assert conn, f'No active connection found to {host}:{port}' conn.close() - del self._connections[(host, port)] + self._connections.pop((host, port), None) - # noinspection PyUnusedLocal @action - def execute(self, - code: str, - device: Optional[str] = None, - host: Optional[str] = None, - port: int = 8266, - password: Optional[str] = None, - conn_timeout: Optional[float] = 10.0, - recv_timeout: Optional[float] = 30.0, - wait_response: bool = True, - **kwargs) -> Response: + def execute( + self, + code: str, + device: Optional[str] = None, + host: Optional[str] = None, + port: int = 8266, + password: Optional[str] = None, + conn_timeout: Optional[float] = 10.0, + recv_timeout: Optional[float] = 30.0, + wait_response: bool = True, + **_, + ) -> Response: """ Run raw Python code on the ESP device. @@ -302,14 +347,25 @@ class EspPlugin(Plugin): :param wait_response: Wait for the response from the device (default: True) :return: The response returned by the Micropython interpreter, as a string. """ - device = self._get_device(device=device, host=host, port=port, password=password) - self.connect(host=device['host'], port=device['port'], password=device['password'], timeout=conn_timeout) - conn = self._connections.get((device['host'], device['port'])) + dev = self._get_device(device=device, host=host, port=port, password=password) + self.connect( + host=dev.host, + port=dev.port, + password=dev.password, + timeout=conn_timeout, + ) + conn = self._connections.get((dev.host, dev.port)) + assert conn, f'No active connection found to {dev.host}:{dev.port}' try: - return conn.send(code, timeout=recv_timeout, wait_response=wait_response) + return Response( + output=conn.send( + code, timeout=recv_timeout, wait_response=wait_response + ) + ) except Exception as e: - conn.close() + if conn: + conn.close() raise e @action @@ -323,12 +379,11 @@ class EspPlugin(Plugin): """ device = self._get_device(**kwargs) pin = device.get_pin(pin) - code = ''' + code = f''' import machine -pin = machine.Pin({pin}, machine.Pin.OUT{pull_up}) +pin = machine.Pin({pin}, machine.Pin.OUT{", machine.Pin.PULL_UP" if pull_up else ""}) pin.on() -'''.format(pin=pin, pull_up=', machine.Pin.PULL_UP' if pull_up else '') - +''' self.execute(code, **kwargs) @action @@ -342,12 +397,11 @@ pin.on() """ device = self._get_device(**kwargs) pin = device.get_pin(pin) - code = ''' + code = f''' import machine -pin = machine.Pin({pin}, machine.Pin.OUT{pull_up}) +pin = machine.Pin({pin}, machine.Pin.OUT{", machine.Pin.PULL_UP" if pull_up else ""}) pin.off() -'''.format(pin=pin, pull_up=', machine.Pin.PULL_UP' if pull_up else '') - +''' self.execute(code, **kwargs) @action @@ -361,19 +415,20 @@ pin.off() """ device = self._get_device(**kwargs) pin = device.get_pin(pin) - code = ''' + code = f''' import machine -pin = machine.Pin({pin}, machine.Pin.OUT{pull_up}) +pin = machine.Pin({pin}, machine.Pin.OUT{", machine.Pin.PULL_UP" if pull_up else ""}) if pin.value(): pin.off() else: pin.on() -'''.format(pin=pin, pull_up=', machine.Pin.PULL_UP' if pull_up else '') - +''' self.execute(code, **kwargs) @action - def pin_read(self, pin: Union[int, str], out: bool = False, pull_up: bool = False, **kwargs) -> bool: + def pin_read( + self, pin: Union[int, str], out: bool = False, pull_up: bool = False, **kwargs + ) -> bool: """ Get the ON/OFF value of a PIN. @@ -385,12 +440,11 @@ else: """ device = self._get_device(**kwargs) pin = device.get_pin(pin) - code = ''' + code = f''' import machine -pin = machine.Pin({pin}, machine.Pin.{inout}{pull_up}) +pin = machine.Pin({pin}, machine.Pin.{"OUT" if out else "IN"}{", machine.Pin.PULL_UP" if pull_up else ""}) pin.value() -'''.format(pin=pin, inout='OUT' if out else 'IN', pull_up=', machine.Pin.PULL_UP' if pull_up else '') - +''' return bool(self.execute(code, **kwargs).output) @action @@ -405,17 +459,18 @@ pin.value() :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. :return: A value between ``0`` and ``1024``. """ - code = ''' + code = f''' import machine adc = machine.ADC({pin}) adc.read() -'''.format(pin=pin) - +''' response = self.execute(code, **kwargs) return int(response.output) @action - def pwm_freq(self, pin: Union[int, str], freq: Optional[int] = None, **kwargs) -> Optional[int]: + def pwm_freq( + self, pin: Union[int, str], freq: Optional[int] = None, **kwargs + ) -> Optional[int]: """ Get/set the frequency of a PWM PIN. @@ -428,15 +483,16 @@ adc.read() code = ''' import machine pin = machine.PWM(machine.Pin({pin})) -pin.freq({freq}) -'''.format(pin=pin, freq=freq if freq else '') - +pin.freq({freq if freq is not None else ""}) +''' ret = self.execute(code, **kwargs).output if not freq: return int(ret) @action - def pwm_duty(self, pin: Union[int, str], duty: Optional[int] = None, **kwargs) -> Optional[int]: + def pwm_duty( + self, pin: Union[int, str], duty: Optional[int] = None, **kwargs + ) -> Optional[int]: """ Get/set the duty cycle of a PWM PIN. @@ -446,18 +502,23 @@ pin.freq({freq}) """ device = self._get_device(**kwargs) pin = device.get_pin(pin) - code = ''' + code = f''' import machine pin = machine.PWM(machine.Pin({pin})) -pin.duty({duty}) -'''.format(pin=pin, duty=duty if duty else '') - +pin.duty({duty if duty is not None else ""}) +''' ret = self.execute(code, **kwargs).output if not duty: return int(ret) @action - def pwm_on(self, pin: Union[int, str], freq: Optional[int] = None, duty: Optional[int] = None, **kwargs): + def pwm_on( + self, + pin: Union[int, str], + freq: Optional[int] = None, + duty: Optional[int] = None, + **kwargs, + ): """ Set the specified PIN to HIGH. @@ -468,7 +529,7 @@ pin.duty({duty}) """ device = self._get_device(**kwargs) pin = device.get_pin(pin) - code = ''' + code = f''' import machine pin = machine.PWM(machine.Pin({pin})) @@ -478,8 +539,7 @@ if {duty}: pin.duty({duty}) pin.on() -'''.format(pin=pin, freq=freq, duty=duty) - +''' self.execute(code, **kwargs) @action @@ -492,18 +552,26 @@ pin.on() """ device = self._get_device(**kwargs) pin = device.get_pin(pin) - code = ''' + code = f''' import machine pin = machine.PWM(machine.Pin({pin})) pin.deinit() -'''.format(pin=pin) - +''' self.execute(code, **kwargs) @action - def spi_open(self, id=1, baudrate: int = 1000000, polarity: int = 0, phase: int = 0, - bits: int = 8, sck: Optional[int] = None, mosi: Optional[int] = None, - miso: Optional[int] = None, **kwargs): + def spi_open( + self, + id=1, + baudrate: int = 1000000, + polarity: int = 0, + phase: int = 0, + bits: int = 8, + sck: Optional[int] = None, + mosi: Optional[int] = None, + miso: Optional[int] = None, + **kwargs, + ): """ Open a connection to an SPI port. Note that ``sck``, ``mosi`` and ``miso`` parameters are only allowed if you're setting up a software @@ -522,17 +590,17 @@ pin.deinit() :param miso: MISO PIN number. :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ - code = ''' + code = f''' args = { 'baudrate': {baudrate}, 'polarity': {polarity}, 'phase': {phase}, 'bits': {bits}, } -'''.format(baudrate=baudrate, polarity=polarity, phase=phase, bits=bits) - +''' self.execute(code, **kwargs) - code = ''' + + code = f''' import machine if {sck}: @@ -541,11 +609,10 @@ if {mosi}: args['mosi'] = machine.Pin({mosi}) if {miso}: args['miso'] = machine.Pin({miso}) -'''.format(sck=sck, mosi=mosi, miso=miso) +''' self.execute(code, **kwargs) - code = 'spi = machine.SPI({id}, **args)'.format(id=id) - self.execute(code, **kwargs) + self.execute(f'spi = machine.SPI({id}, **args)', **kwargs) @action def spi_close(self, **kwargs): @@ -570,12 +637,17 @@ if {miso}: data can't be decoded to a string. """ self.spi_open(**kwargs) - code = 'spi.read({size})'.format(size=size) - response = self.execute(code, **kwargs).output + response = self.execute(f'spi.read({size})', **kwargs).output try: + if isinstance(response, str): + return response + return response.decode() except UnicodeDecodeError: + assert isinstance( + response, bytes + ), f'Invalid response type: {type(response)}' return base64.encodebytes(response).decode() @action @@ -590,19 +662,23 @@ if {miso}: :meth:`platypush.plugins.esp.EspPlugin.execute`. """ if binary: - data = base64.decodebytes(data.encode()) + bin_data = base64.decodebytes(data.encode()) else: - data = data.encode() + bin_data = data.encode() - data = 'b"' + ''.join(['\\x{:02x}'.format(b) for b in data]) + '"' + data = 'b"' + ''.join([f'\\x{b:02x}' for b in bin_data]) + '"' self.spi_open(**kwargs) - - code = 'spi.write({data})'.format(data=data) - self.execute(code, **kwargs) + self.execute(f'spi.write({data})', **kwargs) @action - def i2c_open(self, scl: Optional[int] = None, sda: Optional[int] = None, id: int = -1, baudrate: int = 400000, - **kwargs): + def i2c_open( + self, + scl: Optional[int] = None, + sda: Optional[int] = None, + id: int = -1, + baudrate: int = 400000, + **kwargs, + ): """ Open a connection to an I2C (or "two-wire") port. @@ -615,22 +691,27 @@ if {miso}: :param baudrate: Port frequency/clock rate (default: 400 kHz). :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ - code = ''' + code = ( + ''' import machine args = {} if {scl}: - args['scl'] = machine.Pin(''' + str(scl) + ''') + args['scl'] = machine.Pin(''' + + str(scl) + + ''') if {sda}: - args['sda'] = machine.Pin(''' + str(sda) + ''') + args['sda'] = machine.Pin(''' + + str(sda) + + ''') ''' + ) self.execute(code, **kwargs) - code = ''' + code = f''' i2c = machine.I2C(id={id}, freq={baudrate}, **args) -'''.format(id=id, baudrate=baudrate) - +''' self.execute(code, **kwargs) @action @@ -654,7 +735,7 @@ i2c = machine.I2C(id={id}, freq={baudrate}, **args) :return: List of 7-bit addresses. """ self.i2c_open(**kwargs) - return self.execute('i2c.scan', **kwargs).output + return self.execute('i2c.scan', **kwargs).output # type: ignore @action def i2c_read(self, address: int, size: int, **kwargs) -> str: @@ -669,12 +750,16 @@ i2c = machine.I2C(id={id}, freq={baudrate}, **args) data can't be decoded to a string. """ self.i2c_open(**kwargs) - code = 'i2c.readfrom({address}, {size})'.format(address=address, size=size) - response = self.execute(code, **kwargs).output + response = self.execute(f'i2c.readfrom({address}, {size})', **kwargs).output try: + if isinstance(response, str): + return response return response.decode() except UnicodeDecodeError: + assert isinstance( + response, bytes + ), f'Invalid response type: {type(response)}' return base64.encodebytes(response).decode() @action @@ -692,14 +777,13 @@ i2c = machine.I2C(id={id}, freq={baudrate}, **args) data can't be decoded to a string. """ if binary: - data = base64.decodebytes(data.encode()) + bin_data = base64.decodebytes(data.encode()) else: - data = data.encode() + bin_data = data.encode() - data = 'b"' + ''.join(['\\x{:02x}'.format(b) for b in data]) + '"' + data = 'b"' + ''.join([f'\\x{b:02x}' for b in bin_data]) + '"' self.i2c_open(**kwargs) - code = 'i2c.writeto({address}, {data})'.format(address=address, data=data) - self.execute(code, **kwargs) + self.execute(f'i2c.writeto({address}, {data})', **kwargs) @action def i2c_start(self, **kwargs): @@ -728,9 +812,19 @@ i2c = machine.I2C(id={id}, freq={baudrate}, **args) self.execute('i2c.stop()', **kwargs) @action - def uart_open(self, id=1, baudrate: Optional[int] = 9600, bits: Optional[int] = 8, parity: Optional[int] = None, - stop: int = 1, tx_pin: Optional[int] = None, rx_pin: Optional[int] = None, - timeout: Optional[float] = None, timeout_char: Optional[float] = None, **kwargs): + def uart_open( + self, + id=1, + baudrate: Optional[int] = 9600, + bits: Optional[int] = 8, + parity: Optional[int] = None, + stop: int = 1, + tx_pin: Optional[int] = None, + rx_pin: Optional[int] = None, + timeout: Optional[float] = None, + timeout_char: Optional[float] = None, + **kwargs, + ): """ Open a connection to a UART port. @@ -745,7 +839,7 @@ i2c = machine.I2C(id={id}, freq={baudrate}, **args) :param timeout_char: Specify the time to wait between characters in seconds. :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ - code = ''' + code = f''' args = { 'bits': {bits}, 'parity': {parity}, @@ -760,16 +854,13 @@ if {timeout}: args['timeout'] = {timeout} if {timeout_char}: args['timeout_char'] = {timeout_char} -'''.format(bits=bits, parity=parity, stop=stop, tx_pin=tx_pin, rx_pin=rx_pin, - timeout=timeout, timeout_char=timeout_char) - +''' self.execute(code, **kwargs) - code = ''' + code = f''' import machine uart = machine.UART({id}, {baudrate}, **args) -'''.format(id=id, baudrate=baudrate) - +''' self.execute(code, **kwargs) @action @@ -796,18 +887,23 @@ uart = machine.UART({id}, {baudrate}, **args) """ self.uart_open(**kwargs) - code = ''' + code = f''' args = [] if {size}: args.append({size}) uart.read(*args) -'''.format(size=size) +''' response = self.execute(code, **kwargs).output try: + if isinstance(response, str): + return response return response.decode() except UnicodeDecodeError: + assert isinstance( + response, bytes + ), f'Invalid response type: {type(response)}' return base64.encodebytes(response).decode() @action @@ -824,8 +920,13 @@ uart.read(*args) response = self.execute('uart.readline()', **kwargs).output try: + if isinstance(response, str): + return response return response.decode() except UnicodeDecodeError: + assert isinstance( + response, bytes + ), f'Invalid response type: {type(response)}' return base64.encodebytes(response).decode() @action @@ -840,15 +941,13 @@ uart.read(*args) :meth:`platypush.plugins.esp.EspPlugin.execute`. """ if binary: - data = base64.decodebytes(data.encode()) + bin_data = base64.decodebytes(data.encode()) else: - data = data.encode() + bin_data = data.encode() - data = 'b"' + ''.join(['\\x{:02x}'.format(b) for b in data]) + '"' + data = 'b"' + ''.join([f'\\x{b:02x}' for b in bin_data]) + '"' self.uart_open(**kwargs) - - code = 'uart.write({data})'.format(data=data) - self.execute(code, **kwargs) + self.execute(f'uart.write({data})', **kwargs) @action def uart_send_break(self, **kwargs): @@ -860,8 +959,7 @@ uart.read(*args) :meth:`platypush.plugins.esp.EspPlugin.execute`. """ self.uart_open(**kwargs) - code = 'uart.sendbreak()' - self.execute(code, **kwargs) + self.execute('uart.sendbreak()', **kwargs) @action def get_freq(self, **kwargs) -> int: @@ -874,7 +972,7 @@ uart.read(*args) import machine machine.freq() ''' - return self.execute(code, **kwargs).output + return int(self.execute(code, **kwargs).output) @action def set_freq(self, freq: int, **kwargs): @@ -883,10 +981,10 @@ machine.freq() :param freq: New frequency in Hz. :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ - code = ''' + code = f''' import machine machine.freq({freq}) -'''.format(freq=freq) +''' self.execute(code, **kwargs) @action @@ -947,10 +1045,10 @@ machine.enable_irq() :param seconds: Sleep seconds. :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ - code = ''' + code = f''' import time -time.sleep({sec}) -'''.format(sec=seconds) +time.sleep({seconds}) +''' return self.execute(code, wait_response=False, **kwargs).output @@ -964,10 +1062,10 @@ time.sleep({sec}) :param seconds: Sleep seconds (default: sleep until there are some PIN/RTC events to process) :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ - code = ''' + code = f''' import machine -machine.lightsleep({msec}) -'''.format(msec=int(seconds * 1000) if seconds else '') +machine.lightsleep({int(seconds * 1000) if seconds else ''}) +''' return self.execute(code, wait_response=False, **kwargs).output @@ -981,10 +1079,10 @@ machine.lightsleep({msec}) :param seconds: Sleep seconds (default: sleep until there are some PIN/RTC events to process) :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ - code = ''' + code = f''' import machine -machine.deepsleep({msec}) -'''.format(msec=int(seconds * 1000) if seconds else '') +machine.deepsleep({int(seconds * 1000) if seconds else ''}) +''' return self.execute(code, wait_response=False, **kwargs).output @@ -1000,7 +1098,7 @@ machine.deepsleep({msec}) """ code = ''' import machine -print(':'.join(['{:02x}'.format(b) for b in machine.unique_id()])) +print(':'.join([f'{b:02x}' for b in machine.unique_id()])) ''' return self.execute(code, **kwargs).output @@ -1013,10 +1111,10 @@ print(':'.join(['{:02x}'.format(b) for b in machine.unique_id()])) :param new_password: New password. :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ - code = ''' + code = f''' import webrepl -webrepl._webrepl.password({password}) -'''.format(password=new_password) +webrepl._webrepl.password({new_password}) +''' return self.execute(code, **kwargs).output @@ -1029,7 +1127,7 @@ webrepl._webrepl.password({password}) :param passphrase: WiFi passphrase. :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ - code = ''' + code = f''' import network import time @@ -1039,29 +1137,35 @@ wlan.connect('{essid}', '{passphrase}') while not wlan.isconnected(): time.sleep(1) -'''.format(essid=essid, passphrase=passphrase) - +''' self.execute(code, **kwargs) def net_enabled_change(self, net: str, enabled: bool, **kwargs): - code = ''' + code = f''' import network -wlan = network.WLAN({}) -wlan.active({}) -'''.format(net, str(enabled)) +wlan = network.WLAN({net}) +wlan.active({enabled}) +''' self.execute(code, **kwargs) - def net_ifconfig(self, net: str, ip: Optional[str] = None, netmask: Optional[str] = None, - gateway: Optional[str] = None, dns: Optional[str] = None, **kwargs): - code = ''' + def net_ifconfig( + self, + net: str, + ip: Optional[str] = None, + netmask: Optional[str] = None, + gateway: Optional[str] = None, + dns: Optional[str] = None, + **kwargs, + ): + code = f''' import json import network -wlan = network.WLAN({}) +wlan = network.WLAN({net}) print(json.dumps(list(wlan.ifconfig()))) -'''.format(net) +''' - config = self.execute(code, **kwargs).output + config = list(self.execute(code, **kwargs).output) # type: ignore if ip: config[0] = ip if netmask: @@ -1071,15 +1175,20 @@ print(json.dumps(list(wlan.ifconfig()))) if dns: config[3] = dns - code = 'wlan.ifconfig({})'.format(tuple(config)) - self.execute(code, **kwargs) + self.execute(f'wlan.ifconfig({tuple(config)})', **kwargs) @action - def wifi_config(self, ip: Optional[str] = None, netmask: Optional[str] = None, - gateway: Optional[str] = None, dns: Optional[str] = None, **kwargs) \ - -> Optional[EspWifiConfigResult]: + def wifi_config( + self, + ip: Optional[str] = None, + netmask: Optional[str] = None, + gateway: Optional[str] = None, + dns: Optional[str] = None, + **kwargs, + ) -> Optional[dict]: """ Get or set network properties for the WiFi interface. + If called with no arguments it will return the configuration of the interface. :param ip: IP address. @@ -1087,18 +1196,34 @@ print(json.dumps(list(wlan.ifconfig()))) :param gateway: Default gateway address. :param dns: Default DNS address. :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. + :return: .. schema:: esp.WifiConfigSchema """ if ip or netmask or gateway or dns: - self.net_ifconfig(net_type='network.STA_IF', ip=ip, netmask=netmask, gateway=gateway, dns=dns, **kwargs) + self.net_ifconfig( + net_type='network.STA_IF', + ip=ip, + netmask=netmask, + gateway=gateway, + dns=dns, + **kwargs, + ) return return self.net_config('network.STA_IF', **kwargs) @action - def ap_config(self, ip: Optional[str] = None, netmask: Optional[str] = None, - gateway: Optional[str] = None, dns: Optional[str] = None, essid: Optional[str] = None, - passphrase: Optional[str] = None, channel: Optional[int] = None, - hidden: Optional[bool] = None, **kwargs) -> Optional[EspWifiConfigResult]: + def ap_config( + self, + ip: Optional[str] = None, + netmask: Optional[str] = None, + gateway: Optional[str] = None, + dns: Optional[str] = None, + essid: Optional[str] = None, + passphrase: Optional[str] = None, + channel: Optional[int] = None, + hidden: Optional[bool] = None, + **kwargs, + ) -> Optional[dict]: """ Get or set network properties for the WiFi access point interface. If called with no arguments it will return the configuration of the interface. @@ -1112,24 +1237,39 @@ print(json.dumps(list(wlan.ifconfig()))) :param channel: WiFi channel. :param hidden: Whether the network is hidden. :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. + :return: .. schema:: esp.WifiConfigSchema """ has_args = False - self.execute(code='import network; ap = network.WLAN(network.AP_IF)'.format(essid=essid), **kwargs) + self.execute( + code='import network; ap = network.WLAN(network.AP_IF)', + **kwargs, + ) if ip or netmask or gateway or dns: - self.net_ifconfig(net_type='network.AP_IF', ip=ip, netmask=netmask, gateway=gateway, dns=dns, **kwargs) + self.net_ifconfig( + net_type='network.AP_IF', + ip=ip, + netmask=netmask, + gateway=gateway, + dns=dns, + **kwargs, + ) has_args = True + if essid: - self.execute(code='ap.config(essid="{essid}")'.format(essid=essid), **kwargs) + self.execute(code=f'ap.config(essid="{essid}")', **kwargs) has_args = True if passphrase: - self.execute(code='ap.config(password="{passphrase}")'.format(passphrase=passphrase), **kwargs) + self.execute( + code=f'ap.config(password="{passphrase}")', + **kwargs, + ) has_args = True if channel: - self.execute(code='ap.config(channel={channel})'.format(channel=channel), **kwargs) + self.execute(code=f'ap.config(channel={channel})', **kwargs) has_args = True if hidden is not None: - self.execute(code='ap.config(hidden={hidden})'.format(hidden=str(hidden)), **kwargs) + self.execute(code=f'ap.config(hidden={hidden})', **kwargs) has_args = True if has_args: @@ -1187,21 +1327,30 @@ wlan.disconnect() """ self.net_enabled_change('network.AP_IF', False, **kwargs) - def net_config(self, net_type: str, **kwargs) -> Optional[EspWifiConfigResult]: + def net_config(self, net_type: str, **kwargs) -> Optional[dict]: # Split the code into multiple execution to overcome the size limitation of the input ESP WebREPL buffer. - code = ''' + code = ( + ''' import network import json -net = network.WLAN(''' + net_type + ''') +net = network.WLAN(''' + + net_type + + ''') ifconfig = net.ifconfig() mac = ':'.join([hex(c)[2:] for c in net.config('mac')])''' + ) self.execute(code, **kwargs) code = ''' active = net.active() essid = net.config('essid') -channel = net.config('channel') + +try: + channel = net.config('channel') +except: + channel = None + try: hidden = net.config('hidden') except: @@ -1230,14 +1379,16 @@ print(json.dumps(config)) net = self.execute(code, **kwargs).output if not net: return - return EspWifiConfigResult(**net) + + return dict(WifiConfigSchema().dump(net)) @action - def wifi_scan(self, **kwargs) -> List[EspWifiScanResult]: + def wifi_scan(self, **kwargs) -> List[dict]: """ Scan the available networks. :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. + :return: .. schema:: esp.WifiScanResult(many=True) """ code = ''' import network @@ -1246,8 +1397,8 @@ import json wlan = network.WLAN(network.STA_IF) wlan.active(True) print(json.dumps([{ - 'essid':net[0].decode(), - 'bssid':''.join(['\\\\x' + hex(c)[2:] for c in net[1]]), + 'essid': net[0].decode(), + 'bssid': ':'.join(['%02x' % c for c in net[1]]), 'channel': net[2], 'rssi': net[3], 'auth_mode': net[4], @@ -1258,10 +1409,10 @@ print(json.dumps([{ if not results: return [] - return [EspWifiScanResult(**network) for network in results] + return WifiScanResultSchema().dump(results, many=True) def open_db(self, dbfile: str, **kwargs): - code = ''' + code = f''' import btree try: @@ -1270,12 +1421,12 @@ except OSError: dbfile = open('{dbfile}', 'w+b') db = btree.open(dbfile) -'''.format(dbfile=dbfile) +''' self.close_db(dbfile, **kwargs) self.execute(code, **kwargs) - def close_db(self, dbfile: str, **kwargs): + def close_db(self, *_, **kwargs): code = ''' try: db.close() @@ -1284,7 +1435,7 @@ try: dbfile = None except: pass -'''.format(dbfile=dbfile) +''' self.execute(code, **kwargs) @@ -1302,10 +1453,10 @@ except: :param value: Value to set. :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ - code = ''' -db[b'{key}'] = b'{value}' + code = f''' +db[b'{self.string_quote(key)}'] = b'{self.string_quote(str(value))}' db.flush() -'''.format(key=self.string_quote(key), value=self.string_quote(str(value))) +''' try: self.open_db(dbfile, **kwargs) @@ -1323,12 +1474,12 @@ db.flush() :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. :return: Whichever value is stored as output, or null if the key doesn't exist. """ - code = ''' + code = f''' try: - print(db[b'{key}'].decode()) + print(db[b'{self.string_quote(key)}'].decode()) except KeyError: pass -'''.format(key=self.string_quote(key)) +''' try: self.open_db(dbfile, **kwargs) @@ -1352,7 +1503,7 @@ print(json.dumps([k.decode() for k in db.keys()])) try: self.open_db(dbfile, **kwargs) - return self.execute(code, **kwargs).output + return self.execute(code, **kwargs).output # type: ignore finally: self.close_db(dbfile, **kwargs) @@ -1371,7 +1522,7 @@ print(json.dumps([k.decode() for k in db.values()])) try: self.open_db(dbfile, **kwargs) - return self.execute(code, **kwargs).output + return self.execute(code, **kwargs).output # type: ignore finally: self.close_db(dbfile, **kwargs) @@ -1391,7 +1542,7 @@ print(json.dumps({k.decode(): v.decode() for k, v in db.items()})) try: self.open_db(dbfile, **kwargs) - return self.execute(code, **kwargs).output + return self.execute(code, **kwargs) # type: ignore finally: self.close_db(dbfile, **kwargs) @@ -1418,14 +1569,14 @@ ntptime.settime() :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ directory = self.string_quote(directory) - code = ''' + code = f''' import os import json -print(json.dumps(os.listdir('{dir}'))) -'''.format(dir=directory) +print(json.dumps(os.listdir('{directory}'))) +''' - return self.execute(code, **kwargs).output + return self.execute(code, **kwargs).output # type: ignore @action def chdir(self, directory: str, **kwargs): @@ -1436,10 +1587,10 @@ print(json.dumps(os.listdir('{dir}'))) :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ directory = self.string_quote(directory) - code = ''' + code = f''' import os -os.chdir('{dir}') -'''.format(dir=directory) +os.chdir('{directory}') +''' self.execute(code, **kwargs) @@ -1452,10 +1603,10 @@ os.chdir('{dir}') :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ directory = self.string_quote(directory) - code = ''' + code = f''' import os -os.mkdir('{dir}') -'''.format(dir=directory) +os.mkdir('{directory}') +''' self.execute(code, **kwargs) @@ -1468,10 +1619,10 @@ os.mkdir('{dir}') :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ directory = self.string_quote(directory) - code = ''' + code = f''' import os -os.rmdir('{dir}') -'''.format(dir=directory) +os.rmdir('{directory}') +''' self.execute(code, **kwargs) @@ -1486,10 +1637,10 @@ os.rmdir('{dir}') """ name = self.string_quote(name) new_name = self.string_quote(new_name) - code = ''' + code = f''' import os -os.rename('{old}', '{new}') -'''.format(old=name, new=new_name) +os.rename('{name}', '{new_name}') +''' self.execute(code, **kwargs) @@ -1502,10 +1653,10 @@ os.rename('{old}', '{new}') :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ file = self.string_quote(file) - code = ''' + code = f''' import os os.remove('{file}') -'''.format(file=file) +''' self.execute(code, **kwargs) @@ -1517,17 +1668,19 @@ os.remove('{file}') :param size: Number of random bytes to be generated (default: 1). :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.execute`. """ - code = ''' + code = f''' import os import json print([b for b in os.urandom({size})]) -'''.format(size=size) +''' - return self.execute(code, **kwargs).output + return self.execute(code, **kwargs).output # type: ignore @action - def file_get(self, file: str, binary: bool = False, timeout: Optional[float] = 60.0, **kwargs) -> str: + def file_get( + self, file: str, binary: bool = False, timeout: Optional[float] = 60.0, **kwargs + ) -> str: """ Get the content of a file on the board. @@ -1537,10 +1690,10 @@ print([b for b in os.urandom({size})]) :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.connect`. """ device = self._get_device(**kwargs) - host = device['host'] - port = device['port'] - self.connect(host=host, port=port, password=device['password']) - conn = self._get_connection(host=host, port=port) + host = device.host + port = device.port + self.connect(host=host, port=port, password=device.password) + conn = self._connect(host=host, port=port) with io.BytesIO() as buffer: conn.file_download(file, buffer, timeout=timeout) @@ -1554,7 +1707,13 @@ print([b for b in os.urandom({size})]) return data @action - def file_upload(self, source: str, destination: Optional[str] = None, timeout: Optional[float] = 60.0, **kwargs): + def file_upload( + self, + source: str, + destination: Optional[str] = None, + timeout: Optional[float] = 60.0, + **kwargs, + ): """ Upload a file to the board. @@ -1565,14 +1724,16 @@ print([b for b in os.urandom({size})]) :param kwargs: Parameters to pass to :meth:`platypush.plugins.esp.EspPlugin.connect`. """ device = self._get_device(**kwargs) - host = device['host'] - port = device['port'] - self.connect(host=host, port=port, password=device['password']) - conn = self._get_connection(host=host, port=port) + host = device.host + port = device.port + self.connect(host=host, port=port, password=device.password) + conn = self._connect(host=host, port=port) conn.file_upload(source=source, destination=destination, timeout=timeout) @action - def file_download(self, source: str, destination: str, timeout: Optional[float] = 60.0, **kwargs): + def file_download( + self, source: str, destination: str, timeout: Optional[float] = 60.0, **kwargs + ): """ Download a file from the board to the local machine. @@ -1587,27 +1748,29 @@ print([b for b in os.urandom({size})]) destination = os.path.join(destination, filename) device = self._get_device(**kwargs) - host = device['host'] - port = device['port'] - self.connect(host=host, port=port, password=device['password']) - conn = self._get_connection(host=host, port=port) + host = device.host + port = device.port + self.connect(host=host, port=port, password=device.password) + conn = self._connect(host=host, port=port) with open(destination, 'wb') as f: conn.file_download(source, f, timeout=timeout) - def _dht_get_value(self, pin: Union[int, str], dht_type: int, value: str, **kwargs) -> float: + def _dht_get_value( + self, pin: Union[int, str], dht_type: int, value: str, **kwargs + ) -> float: device = self._get_device(**kwargs) pin = device.get_pin(pin) - code = ''' + code = f''' import machine import dht -dht_sensor = dht.DHT{type}(machine.Pin({pin})) +dht_sensor = dht.DHT{dht_type}(machine.Pin({pin})) dht_sensor.measure() dht_sensor.{value}() -'''.format(pin=pin, type=dht_type, value=value) +''' - return self.execute(code, **kwargs).output + return float(self.execute(code, **kwargs).output) @action def dht11_get_temperature(self, pin: Union[int, str], **kwargs) -> float: diff --git a/platypush/plugins/esp/models/device.py b/platypush/plugins/esp/models/device.py index 4b019a586..ac1b8ce5e 100644 --- a/platypush/plugins/esp/models/device.py +++ b/platypush/plugins/esp/models/device.py @@ -1,30 +1,44 @@ +from dataclasses import dataclass from typing import Optional, List, Union -from platypush.message import Mapping - -class Pin(Mapping): +@dataclass +class Pin: """ This class models the configuration for the PIN of a device. """ - def __init__(self, number: int, name: Optional[str] = None, pwm: bool = False, pull_up: bool = False): - super().__init__(number=number, name=name, pwm=pwm, pull_up=pull_up) + + number: int + name: Optional[str] = None + pwm: bool = False + pull_up: bool = False -class Device(Mapping): +@dataclass +class Device: """ This class models the properties of a configured ESP device. """ - def __init__(self, host: str, port: int = 8266, password: Optional[str] = None, - name: Optional[str] = None, pins: List[Union[Pin, dict]] = None): - pins = [ - pin if isinstance(pin, Pin) else Pin(**pin) - for pin in (pins or []) + + host: str + port: int = 8266 + password: Optional[str] = None + name: Optional[str] = None + pins: Optional[List[Union[Pin, dict]]] = None + + @property + def _pins(self): + return [ + pin if isinstance(pin, Pin) else Pin(**pin) for pin in (self.pins or []) ] - super().__init__(host=host, port=port, password=password, pins=pins, name=name) - self._pin_by_name = {pin['name']: pin for pin in self['pins'] if pin['name']} - self._pin_by_number = {pin['number']: pin for pin in self['pins']} + @property + def _pins_by_name(self): + return {pin.name: pin for pin in self._pins if pin.name} + + @property + def _pins_by_number(self): + return {pin.number: pin for pin in self._pins} def get_pin(self, pin) -> int: try: @@ -32,8 +46,9 @@ class Device(Mapping): except ValueError: pass - assert pin in self._pin_by_name, 'No such PIN configured: {}'.format(pin) - return self._pin_by_name[pin]['number'] + pin_obj = self._pins_by_name.get(pin) + assert pin_obj, f'No such PIN configured: {pin}' + return pin_obj.number # vim:sw=4:ts=4:et: diff --git a/platypush/schemas/esp.py b/platypush/schemas/esp.py new file mode 100644 index 000000000..19b15d099 --- /dev/null +++ b/platypush/schemas/esp.py @@ -0,0 +1,148 @@ +from marshmallow import EXCLUDE, fields +from marshmallow.schema import Schema + + +class WifiScanResultSchema(Schema): + """ + Schema for Wi-Fi scan results. + """ + + class Meta: # type: ignore + """ + Exclude unknown fields. + """ + + unknown = EXCLUDE + + essid = fields.Str( + required=True, + metadata={ + "description": "ESSID of the Wi-Fi network.", + "example": "MyNetwork", + }, + ) + + bssid = fields.Str( + required=True, + metadata={ + "description": "BSSID of the Wi-Fi network.", + "example": "00:11:22:33:44:55", + }, + ) + + channel = fields.Int( + required=True, + metadata={ + "description": "Channel of the Wi-Fi network.", + "example": 6, + }, + ) + + rssi = fields.Int( + required=True, + metadata={ + "description": "RSSI of the Wi-Fi network.", + "example": -50, + }, + ) + + auth_mode = fields.Int( + required=True, + metadata={ + "description": "Authentication mode of the Wi-Fi network.", + "example": 3, + }, + ) + + hidden = fields.Bool( + required=True, + metadata={ + "description": "Whether the Wi-Fi network is hidden.", + "example": False, + }, + ) + + +class WifiConfigSchema(Schema): + """ + Schema for Wi-Fi configuration. + """ + + class Meta: # type: ignore + """ + Exclude unknown fields. + """ + + unknown = EXCLUDE + + ip = fields.Str( + required=True, + metadata={ + "description": "IP address of the Wi-Fi interface.", + "example": "192.168.1.10", + }, + ) + + netmask = fields.Str( + required=True, + metadata={ + "description": "Netmask of the Wi-Fi network.", + "example": "255.255.255.0", + }, + ) + + gateway = fields.Str( + required=True, + metadata={ + "description": "Gateway of the Wi-Fi network.", + "example": "192.168.1.1", + }, + ) + + dns = fields.Str( + required=True, + metadata={ + "description": "DNS server of the Wi-Fi network.", + "example": "1.1.1.1", + }, + ) + + mac = fields.Str( + required=True, + metadata={ + "description": "MAC address of the Wi-Fi network.", + "example": "00:11:22:33:44:55", + }, + ) + + active = fields.Bool( + required=True, + metadata={ + "description": "Whether the Wi-Fi network is active.", + "example": True, + }, + ) + + essid = fields.Str( + required=True, + metadata={ + "description": "ESSID of the Wi-Fi network.", + "example": "MyNetwork", + }, + ) + + channel = fields.Int( + required=True, + metadata={ + "description": "Channel of the Wi-Fi network.", + "example": 6, + }, + ) + + hidden = fields.Bool( + required=True, + metadata={ + "description": "Whether the Wi-Fi network is hidden.", + "example": False, + }, + )