[#398] Refactored esp plugin.
Some checks reported errors
continuous-integration/drone/push Build was killed

- Converted `Response` objects into `Schema`s.

- Removed the last references to the deprecated `Mapping` object.

- Fixed all errors and warnings in the plugin.
This commit is contained in:
Fabio Manganiello 2024-05-15 01:28:15 +02:00
parent 55e230c361
commit 20f3eaf375
Signed by: blacklight
GPG key ID: D90FBA7F76362774
5 changed files with 674 additions and 451 deletions

View file

@ -30,6 +30,11 @@ class Message:
""" """
class Encoder(json.JSONEncoder): class Encoder(json.JSONEncoder):
"""
JSON encoder that can serialize custom types commonly handled in
Platypush messages.
"""
@staticmethod @staticmethod
def parse_numpy(obj): def parse_numpy(obj):
try: try:
@ -57,50 +62,50 @@ class Message:
if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)): if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
return obj.isoformat() return obj.isoformat()
def default(self, obj): def default(self, o):
from platypush.procedure import Procedure from platypush.procedure import Procedure
value = self.parse_datetime(obj) value = self.parse_datetime(o)
if value is not None: if value is not None:
return value return value
if isinstance(obj, set): if isinstance(o, set):
return list(obj) return list(o)
if isinstance(obj, UUID): if isinstance(o, UUID):
return str(obj) return str(o)
value = self.parse_numpy(obj) value = self.parse_numpy(o)
if value is not None: if value is not None:
return value return value
if isinstance(obj, JSONAble): if isinstance(o, JSONAble):
return obj.to_json() return o.to_json()
if isinstance(obj, Procedure): if isinstance(o, Procedure):
return obj.to_dict() return o.to_dict()
if isinstance(obj, Enum): if isinstance(o, Enum):
return obj.value return o.value
if isinstance(obj, Exception): if isinstance(o, Exception):
return f'<{obj.__class__.__name__}>' + (f' {obj}' if obj else '') return f'<{o.__class__.__name__}>' + (f' {o}' if o else '')
if is_dataclass(obj): if is_dataclass(o):
return asdict(obj) return asdict(o)
if isinstance(obj, Message): if isinstance(o, Message):
return obj.to_dict(obj) return o.to_dict(o)
# Don't serialize I/O wrappers/objects # Don't serialize I/O wrappers/objects
if isinstance(obj, io.IOBase): if isinstance(o, io.IOBase):
return None return None
try: try:
return super().default(obj) return super().default(o)
except Exception as e: except Exception as e:
_logger.warning( _logger.warning(
'Could not serialize object type %s: %s: %s', type(obj), e, obj 'Could not serialize object type %s: %s: %s', type(o), e, o
) )
def __init__(self, *_, timestamp=None, logging_level=logging.INFO, **__): def __init__(self, *_, timestamp=None, logging_level=logging.INFO, **__):
@ -205,62 +210,4 @@ class Message:
return msgtype.build(msg) return msgtype.build(msg)
class Mapping(dict):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
for k, v in kwargs.items():
self.__setattr__(k, v)
def __setitem__(self, key, item):
self.__dict__[key] = item
def __getitem__(self, key):
return self.__dict__[key]
def __repr__(self):
return repr(self.__dict__)
def __len__(self):
return len(self.__dict__)
def __delitem__(self, key):
del self.__dict__[key]
def clear(self):
return self.__dict__.clear()
def copy(self):
return self.__dict__.copy()
def has_key(self, k):
return k in self.__dict__
def update(self, *args, **kwargs):
return self.__dict__.update(*args, **kwargs)
def keys(self):
return self.__dict__.keys()
def values(self):
return self.__dict__.values()
def items(self):
return self.__dict__.items()
def pop(self, *args):
return self.__dict__.pop(*args)
def __cmp__(self, dict_):
return self.__cmp__(dict_)
def __contains__(self, item):
return item in self.__dict__
def __iter__(self):
return iter(self.__dict__)
def __str__(self):
return str(self.__dict__)
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

View file

@ -1,50 +0,0 @@
from typing import Optional
from platypush.message import Mapping
class EspWifiScanResult(Mapping):
def __init__(self,
essid: str,
bssid: str,
channel: int,
rssi: int,
auth_mode: int,
hidden: bool,
*args,
**kwargs):
self.essid = essid
self.bssid = bssid
self.channel = channel
self.rssi = rssi
self.auth_mode = auth_mode
self.hidden = hidden
super().__init__(*args, **dict(self), **kwargs)
class EspWifiConfigResult(Mapping):
def __init__(self,
ip: str,
netmask: str,
gateway: str,
dns: str,
mac: str,
active: bool,
essid: Optional[str] = None,
channel: Optional[int] = None,
hidden: Optional[bool] = None,
*args,
**kwargs):
self.ip = ip
self.netmask = netmask
self.gateway = gateway
self.dns = dns
self.mac = mac
self.active = active
self.essid = essid
self.channel = channel
self.hidden = hidden
super().__init__(*args, **dict(self), **kwargs)
# vim:sw=4:ts=4:et:

File diff suppressed because it is too large Load diff

View file

@ -1,30 +1,44 @@
from dataclasses import dataclass
from typing import Optional, List, Union from typing import Optional, List, Union
from platypush.message import Mapping
@dataclass
class Pin(Mapping): class Pin:
""" """
This class models the configuration for the PIN of a device. This class models the configuration for the PIN of a device.
""" """
def __init__(self, number: int, name: Optional[str] = None, pwm: bool = False, pull_up: bool = False):
super().__init__(number=number, name=name, pwm=pwm, pull_up=pull_up) number: int
name: Optional[str] = None
pwm: bool = False
pull_up: bool = False
class Device(Mapping): @dataclass
class Device:
""" """
This class models the properties of a configured ESP device. This class models the properties of a configured ESP device.
""" """
def __init__(self, host: str, port: int = 8266, password: Optional[str] = None,
name: Optional[str] = None, pins: List[Union[Pin, dict]] = None): host: str
pins = [ port: int = 8266
pin if isinstance(pin, Pin) else Pin(**pin) password: Optional[str] = None
for pin in (pins or []) name: Optional[str] = None
pins: Optional[List[Union[Pin, dict]]] = None
@property
def _pins(self):
return [
pin if isinstance(pin, Pin) else Pin(**pin) for pin in (self.pins or [])
] ]
super().__init__(host=host, port=port, password=password, pins=pins, name=name) @property
self._pin_by_name = {pin['name']: pin for pin in self['pins'] if pin['name']} def _pins_by_name(self):
self._pin_by_number = {pin['number']: pin for pin in self['pins']} return {pin.name: pin for pin in self._pins if pin.name}
@property
def _pins_by_number(self):
return {pin.number: pin for pin in self._pins}
def get_pin(self, pin) -> int: def get_pin(self, pin) -> int:
try: try:
@ -32,8 +46,9 @@ class Device(Mapping):
except ValueError: except ValueError:
pass pass
assert pin in self._pin_by_name, 'No such PIN configured: {}'.format(pin) pin_obj = self._pins_by_name.get(pin)
return self._pin_by_name[pin]['number'] assert pin_obj, f'No such PIN configured: {pin}'
return pin_obj.number
# vim:sw=4:ts=4:et: # vim:sw=4:ts=4:et:

148
platypush/schemas/esp.py Normal file
View file

@ -0,0 +1,148 @@
from marshmallow import EXCLUDE, fields
from marshmallow.schema import Schema
class WifiScanResultSchema(Schema):
"""
Schema for Wi-Fi scan results.
"""
class Meta: # type: ignore
"""
Exclude unknown fields.
"""
unknown = EXCLUDE
essid = fields.Str(
required=True,
metadata={
"description": "ESSID of the Wi-Fi network.",
"example": "MyNetwork",
},
)
bssid = fields.Str(
required=True,
metadata={
"description": "BSSID of the Wi-Fi network.",
"example": "00:11:22:33:44:55",
},
)
channel = fields.Int(
required=True,
metadata={
"description": "Channel of the Wi-Fi network.",
"example": 6,
},
)
rssi = fields.Int(
required=True,
metadata={
"description": "RSSI of the Wi-Fi network.",
"example": -50,
},
)
auth_mode = fields.Int(
required=True,
metadata={
"description": "Authentication mode of the Wi-Fi network.",
"example": 3,
},
)
hidden = fields.Bool(
required=True,
metadata={
"description": "Whether the Wi-Fi network is hidden.",
"example": False,
},
)
class WifiConfigSchema(Schema):
"""
Schema for Wi-Fi configuration.
"""
class Meta: # type: ignore
"""
Exclude unknown fields.
"""
unknown = EXCLUDE
ip = fields.Str(
required=True,
metadata={
"description": "IP address of the Wi-Fi interface.",
"example": "192.168.1.10",
},
)
netmask = fields.Str(
required=True,
metadata={
"description": "Netmask of the Wi-Fi network.",
"example": "255.255.255.0",
},
)
gateway = fields.Str(
required=True,
metadata={
"description": "Gateway of the Wi-Fi network.",
"example": "192.168.1.1",
},
)
dns = fields.Str(
required=True,
metadata={
"description": "DNS server of the Wi-Fi network.",
"example": "1.1.1.1",
},
)
mac = fields.Str(
required=True,
metadata={
"description": "MAC address of the Wi-Fi network.",
"example": "00:11:22:33:44:55",
},
)
active = fields.Bool(
required=True,
metadata={
"description": "Whether the Wi-Fi network is active.",
"example": True,
},
)
essid = fields.Str(
required=True,
metadata={
"description": "ESSID of the Wi-Fi network.",
"example": "MyNetwork",
},
)
channel = fields.Int(
required=True,
metadata={
"description": "Channel of the Wi-Fi network.",
"example": 6,
},
)
hidden = fields.Bool(
required=True,
metadata={
"description": "Whether the Wi-Fi network is hidden.",
"example": False,
},
)