diff --git a/platypush/plugins/switch/wemo/__init__.py b/platypush/plugins/switch/wemo/__init__.py index 462326671..2c92072fe 100644 --- a/platypush/plugins/switch/wemo/__init__.py +++ b/platypush/plugins/switch/wemo/__init__.py @@ -1,18 +1,10 @@ -import enum -import re -import textwrap - -from xml.dom.minidom import parseString -import requests +import ipaddress from platypush.plugins import action from platypush.plugins.switch import SwitchPlugin - - -class SwitchAction(enum.Enum): - GET_STATE = 'GetBinaryState' - SET_STATE = 'SetBinaryState' - GET_NAME = 'GetFriendlyName' +from platypush.utils.workers import Workers +from .lib import WemoRunner +from .scanner import Scanner class SwitchWemoPlugin(SwitchPlugin): @@ -27,19 +19,29 @@ class SwitchWemoPlugin(SwitchPlugin): _default_port = 49153 - def __init__(self, devices=None, **kwargs): + def __init__(self, devices=None, netmask: str = None, port: int = _default_port, **kwargs): """ :param devices: List of IP addresses or name->address map containing the WeMo Switch devices to control. This plugin previously used ouimeaux for auto-discovery but it's been dropped because 1. too slow 2. too heavy 3. auto-discovery failed too often. :type devices: list or dict + + :param netmask: Alternatively to a list of static IP->name pairs, you can specify the network mask where + the devices should be scanned (e.g. '192.168.1.0/24') + + :param port: Port where the WeMo devices are expected to expose the RPC/XML over HTTP service (default: 49153) """ super().__init__(**kwargs) - self._port = self._default_port + self.port = port + self.netmask = netmask + self._devices = {} + self._init_devices(devices) + + def _init_devices(self, devices): if devices: - self._devices = devices if isinstance(devices, dict) else \ - {addr: addr for addr in devices} + self._devices.update(devices if isinstance(devices, dict) else + {addr: addr for addr in devices}) else: self._devices = {} @@ -71,51 +73,25 @@ class SwitchWemoPlugin(SwitchPlugin): for device in self._devices.values() ] - # noinspection PyShadowingNames - def _exec(self, device: str, action: SwitchAction, port: int = _default_port, value=None): + def _get_address(self, device: str) -> str: if device not in self._addresses: try: - device = self._devices[device] + return self._devices[device] except KeyError: pass - state_name = action.value[3:] - - response = requests.post( - 'http://{}:{}/upnp/control/basicevent1'.format(device, port), - headers={ - 'User-Agent': '', - 'Accept': '', - 'Content-Type': 'text/xml; charset="utf-8"', - 'SOAPACTION': '\"urn:Belkin:service:basicevent:1#{}\"'.format(action.value), - }, - data=re.sub('\s+', ' ', textwrap.dedent( - ''' - - - - - <{state}>{value} - - - '''.format(action=action.value, state=state_name, - value=value if value is not None else '')))) - - dom = parseString(response.text) - return dom.getElementsByTagName(state_name).item(0).firstChild.data + return device @action - def status(self, device=None, *args, **kwargs): + def status(self, device: str = None, *args, **kwargs): devices = {device: device} if device else self._devices.copy() ret = [ { 'id': addr, 'ip': addr, - 'name': name if name != addr else self.get_name(addr).output, - 'on': self.get_state(addr).output, + 'name': name if name != addr else WemoRunner.get_name(addr), + 'on': WemoRunner.get_state(addr), } for (name, addr) in devices.items() ] @@ -129,7 +105,8 @@ class SwitchWemoPlugin(SwitchPlugin): :param device: Device name or address """ - self._exec(device=device, action=SwitchAction.SET_STATE, value=1) + device = self._get_address(device) + WemoRunner.on(device) return self.status(device) @action @@ -139,7 +116,8 @@ class SwitchWemoPlugin(SwitchPlugin): :param device: Device name or address """ - self._exec(device=device, action=SwitchAction.SET_STATE, value=0) + device = self._get_address(device) + WemoRunner.off(device) return self.status(device) @action @@ -149,8 +127,9 @@ class SwitchWemoPlugin(SwitchPlugin): :param device: Device name or address """ - state = self.get_state(device).output - return self.on(device) if not state else self.off(device) + device = self._get_address(device) + WemoRunner.toggle(device) + return self.status(device) @action def get_state(self, device: str): @@ -159,8 +138,8 @@ class SwitchWemoPlugin(SwitchPlugin): :param device: Device name or address """ - state = self._exec(device=device, action=SwitchAction.GET_STATE) - return bool(int(state)) + device = self._get_address(device) + return WemoRunner.get_state(device) @action def get_name(self, device: str): @@ -169,7 +148,26 @@ class SwitchWemoPlugin(SwitchPlugin): :param device: Device name or address """ - return self._exec(device=device, action=SwitchAction.GET_NAME) + device = self._get_address(device) + return WemoRunner.get_name(device) + + @action + def scan(self, netmask: str = None): + netmask = netmask or self.netmask + assert netmask, 'Scan not supported: No netmask specified' + + workers = Workers(10, Scanner, port=self.port) + with workers: + for addr in ipaddress.IPv4Network(netmask): + workers.put(addr.exploded) + + devices = { + dev.name: dev.addr + for dev in workers.responses + } + + self._init_devices(devices) + return self.status() # vim:sw=4:ts=4:et: diff --git a/platypush/plugins/switch/wemo/lib.py b/platypush/plugins/switch/wemo/lib.py new file mode 100644 index 000000000..b2bd0b127 --- /dev/null +++ b/platypush/plugins/switch/wemo/lib.py @@ -0,0 +1,71 @@ +import enum +import re +import textwrap + +from xml.dom.minidom import parseString +import requests + + +class SwitchAction(enum.Enum): + GET_STATE = 'GetBinaryState' + SET_STATE = 'SetBinaryState' + GET_NAME = 'GetFriendlyName' + + +class WemoRunner: + default_port = 49153 + + @staticmethod + def exec(device: str, action: SwitchAction, port: int = default_port, value=None): + state_name = action.value[3:] + + response = requests.post( + 'http://{}:{}/upnp/control/basicevent1'.format(device, port), + headers={ + 'User-Agent': '', + 'Accept': '', + 'Content-Type': 'text/xml; charset="utf-8"', + 'SOAPACTION': '\"urn:Belkin:service:basicevent:1#{}\"'.format(action.value), + }, + data=re.sub('\s+', ' ', textwrap.dedent( + ''' + + + + + <{state}>{value} + + + '''.format(action=action.value, state=state_name, + value=value if value is not None else '')))) + + dom = parseString(response.text) + return dom.getElementsByTagName(state_name).item(0).firstChild.data + + @classmethod + def get_state(cls, device: str) -> bool: + state = cls.exec(device=device, action=SwitchAction.GET_STATE) + return bool(int(state)) + + @classmethod + def get_name(cls, device: str) -> str: + name = cls.exec(device=device, action=SwitchAction.GET_NAME) + return name + + @classmethod + def on(cls, device): + cls.exec(device=device, action=SwitchAction.SET_STATE, value=1) + + @classmethod + def off(cls, device): + cls.exec(device=device, action=SwitchAction.SET_STATE, value=0) + + @classmethod + def toggle(cls, device): + state = cls.get_state(device) + cls.exec(device=device, action=SwitchAction.SET_STATE, value=int(not state)) + + +# vim:sw=4:ts=4:et: diff --git a/platypush/plugins/switch/wemo/scanner.py b/platypush/plugins/switch/wemo/scanner.py new file mode 100644 index 000000000..3c51f29ed --- /dev/null +++ b/platypush/plugins/switch/wemo/scanner.py @@ -0,0 +1,37 @@ +import multiprocessing +import socket + +from typing import Optional + +from platypush.utils.workers import Worker +from .lib import WemoRunner + + +class ScanResult: + def __init__(self, addr: str, name: str, on: bool): + self.addr = addr + self.name = name + self.on = on + + +class Scanner(Worker): + timeout = 1.5 + + def __init__(self, request_queue: multiprocessing.Queue, response_queue: multiprocessing.Queue, + port: int = WemoRunner.default_port): + super().__init__(request_queue, response_queue) + self.port = port + + def process(self, addr: str) -> Optional[ScanResult]: + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(self.timeout) + sock.connect((addr, self.port)) + sock.close() + + return ScanResult(addr=addr, name=WemoRunner.get_name(addr), on=WemoRunner.get_state(addr)) + except OSError: + pass + + +# vim:sw=4:ts=4:et: diff --git a/platypush/utils/workers.py b/platypush/utils/workers.py new file mode 100644 index 000000000..37cd63581 --- /dev/null +++ b/platypush/utils/workers.py @@ -0,0 +1,140 @@ +import multiprocessing + +from abc import ABC, abstractmethod +from typing import Type + + +class EndOfStream: + pass + + +class Worker(ABC, multiprocessing.Process): + """ + Generic class for worker processes, used to split the execution of an action over multiple + parallel instances. + """ + + def __init__(self, request_queue: multiprocessing.Queue, response_queue=multiprocessing.Queue): + """ + :param request_queue: The worker will listen for messages to process over this queue + :param response_queue: The worker will return responses over this queue + """ + super().__init__() + self.request_queue = request_queue + self.response_queue = response_queue + + def run(self) -> None: + """ + The worker will run until it receives a :class:`EndOfStream` message on the queue. + """ + while True: + msg = self.request_queue.get() + if isinstance(msg, EndOfStream): + break + + try: + ret = self.process(msg) + except Exception as e: + ret = e + + if ret: + # noinspection PyArgumentList,PyCallByClass + self.response_queue.put(ret) + + @abstractmethod + def process(self, msg): + """ + This method must be implemented by the derived classes. + It will take as argument a message received over the `request_queue` and will return a value that will be + processed by the consumer or None. + + If this function raises an exception then the exception will be pushed to the response queue and can be + handled by the consumer. + """ + raise NotImplementedError('Must be implemented by a derived class') + + def end_stream(self) -> None: + """ + This method will be called when we have no more messages to send to the worker. A special + `EndOfStream` object will be sent on the `request_queue`. + """ + self.request_queue.put(EndOfStream()) + + +class Workers: + """ + Model for a pool of workers. Syntax: + + .. code-block:: python + + class Squarer(Worker): + def process(self, n): + return n * n + + workers = Workers(5, Squarer) # Allocate 5 workers of type Squarer + with workers: + for n in range(100)): + workers.put(n) + + print(workers.responses) + + """ + + def __init__(self, n_workers: int, worker_type: Type[Worker], *args, **kwargs): + """ + :param n_workers: Number of workers + :param worker_type: Type of the workers that will be instantiated. Must be a subclass of :class:`Worker`. + :param args: Extra args to pass to the `worker_type` constructor + :param kwargs: Extra kwargs to pass to the `worker_type` constructor + """ + self.request_queue = multiprocessing.Queue() + self.response_queue = multiprocessing.Queue() + # noinspection PyArgumentList + self._workers = [worker_type(self.request_queue, self.response_queue, *args, **kwargs) + for _ in range(n_workers)] + self.responses: list = [] + + def start(self): + for wrk in self._workers: + wrk.start() + + def put(self, msg) -> None: + """ + Put a message on the `request_queue` + """ + self.request_queue.put(msg) + + def wait(self) -> list: + """ + Wait for the termination of all the workers + + :ret: A list containing the processed responses + """ + while self._workers: + for i, wrk in enumerate(self._workers): + if not self._workers[i].is_alive(): + self._workers.pop(i) + break + + self.responses = [] + while not self.response_queue.empty(): + self.responses.append(self.response_queue.get()) + return self.responses + + def end_stream(self): + """ + Mark the termination of the stream by sending an :class:`EndOfStream` message on the `request_queue` for + each of the running workers. + """ + for wrk in self._workers: + wrk.end_stream() + + def __enter__(self): + self.start() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.end_stream() + self.wait() + + +# vim:sw=4:ts=4:et: