diff --git a/platypush/plugins/pushbullet/__init__.py b/platypush/plugins/pushbullet/__init__.py index 65ccbdc24f..ab2c6ab21d 100644 --- a/platypush/plugins/pushbullet/__init__.py +++ b/platypush/plugins/pushbullet/__init__.py @@ -16,7 +16,8 @@ class PushbulletPlugin(Plugin): Requires: - * The :class:`platypush.backend.pushbullet.Pushbullet` backend enabled + * The :class:`platypush.backend.pushbullet.PushbulletBackend` backend enabled + """ def __init__(self, token: Optional[str] = None, **kwargs): @@ -44,23 +45,23 @@ class PushbulletPlugin(Plugin): """ Get the list of available devices """ - resp = requests.get('https://api.pushbullet.com/v2/devices', - headers={'Authorization': 'Bearer ' + self.token, - 'Content-Type': 'application/json'}) + resp = requests.get( + 'https://api.pushbullet.com/v2/devices', + headers={ + 'Authorization': 'Bearer ' + self.token, + 'Content-Type': 'application/json', + }, + ) self._devices = resp.json().get('devices', []) - self._devices_by_id = { - dev['iden']: dev - for dev in self._devices - } + self._devices_by_id = {dev['iden']: dev for dev in self._devices} self._devices_by_name = { - dev['nickname']: dev - for dev in self._devices if 'nickname' in dev + dev['nickname']: dev for dev in self._devices if 'nickname' in dev } @action - def get_device(self, device): + def get_device(self, device) -> Optional[dict]: """ :param device: Device ID or name """ @@ -79,7 +80,14 @@ class PushbulletPlugin(Plugin): refreshed = True @action - def send_note(self, device: str = None, body: str = None, title: str = None, url: str = None, **kwargs): + def send_note( + self, + device: Optional[str] = None, + body: Optional[str] = None, + title: Optional[str] = None, + url: Optional[str] = None, + **kwargs, + ): """ Send a note push. @@ -90,10 +98,11 @@ class PushbulletPlugin(Plugin): :param kwargs: Push arguments, see https://docs.pushbullet.com/#create-push """ + dev = None if device: - device = self.get_device(device).output - if not device: - raise RuntimeError('No such device') + dev = self.get_device(device).output + if not dev: + raise RuntimeError(f'No such device: {device}') kwargs['body'] = body kwargs['title'] = title @@ -102,21 +111,25 @@ class PushbulletPlugin(Plugin): kwargs['type'] = 'link' kwargs['url'] = url - if device: - # noinspection PyTypeChecker - kwargs['device_iden'] = device['iden'] + if dev: + kwargs['device_iden'] = dev['iden'] - resp = requests.post('https://api.pushbullet.com/v2/pushes', - data=json.dumps(kwargs), - headers={'Authorization': 'Bearer ' + self.token, - 'Content-Type': 'application/json'}) + resp = requests.post( + 'https://api.pushbullet.com/v2/pushes', + data=json.dumps(kwargs), + headers={ + 'Authorization': 'Bearer ' + self.token, + 'Content-Type': 'application/json', + }, + ) if resp.status_code >= 400: - raise Exception('Pushbullet push failed with status {}: {}'. - format(resp.status_code, resp.json())) + raise Exception( + f'Pushbullet push failed with status {resp.status_code}: {resp.json()}' + ) @action - def send_file(self, filename: str, device: str = None): + def send_file(self, filename: str, device: Optional[str] = None): """ Send a file. @@ -124,48 +137,61 @@ class PushbulletPlugin(Plugin): :param filename: Path to the local file """ + dev = None if device: - device = self.get_device(device).output - if not device: - raise RuntimeError('No such device') + dev = self.get_device(device).output + if not dev: + raise RuntimeError(f'No such device: {device}') - resp = requests.post('https://api.pushbullet.com/v2/upload-request', - data=json.dumps({'file_name': os.path.basename(filename)}), - headers={'Authorization': 'Bearer ' + self.token, - 'Content-Type': 'application/json'}) + resp = requests.post( + 'https://api.pushbullet.com/v2/upload-request', + data=json.dumps({'file_name': os.path.basename(filename)}), + headers={ + 'Authorization': 'Bearer ' + self.token, + 'Content-Type': 'application/json', + }, + ) if resp.status_code != 200: - raise Exception('Pushbullet file upload request failed with status {}'. - format(resp.status_code)) + raise Exception( + f'Pushbullet file upload request failed with status {resp.status_code}' + ) r = resp.json() - resp = requests.post(r['upload_url'], data=r['data'], - files={'file': open(filename, 'rb')}) + with open(filename, 'rb') as f: + resp = requests.post(r['upload_url'], data=r['data'], files={'file': f}) if resp.status_code != 204: - raise Exception('Pushbullet file upload failed with status {}'. - format(resp.status_code)) + raise Exception( + f'Pushbullet file upload failed with status {resp.status_code}' + ) - # noinspection PyTypeChecker - resp = requests.post('https://api.pushbullet.com/v2/pushes', - headers={'Authorization': 'Bearer ' + self.token, - 'Content-Type': 'application/json'}, - - data=json.dumps({ - 'type': 'file', - 'device_iden': device['iden'] if device else None, - 'file_name': r['file_name'], - 'file_type': r['file_type'], - 'file_url': r['file_url']})) + resp = requests.post( + 'https://api.pushbullet.com/v2/pushes', + headers={ + 'Authorization': 'Bearer ' + self.token, + 'Content-Type': 'application/json', + }, + data=json.dumps( + { + 'type': 'file', + 'device_iden': dev['iden'] if dev else None, + 'file_name': r['file_name'], + 'file_type': r['file_type'], + 'file_url': r['file_url'], + } + ), + ) if resp.status_code >= 400: - raise Exception('Pushbullet file push failed with status {}'. - format(resp.status_code)) + raise Exception( + f'Pushbullet file push failed with status {resp.status_code}' + ) return { 'filename': r['file_name'], 'type': r['file_type'], - 'url': r['file_url'] + 'url': r['file_url'], } @action @@ -178,17 +204,23 @@ class PushbulletPlugin(Plugin): backend = get_backend('pushbullet') device_id = backend.get_device_id() if backend else None - resp = requests.post('https://api.pushbullet.com/v2/ephemerals', - data=json.dumps({ - 'type': 'push', - 'push': { - 'body': text, - 'type': 'clip', - 'source_device_iden': device_id, - }, - }), - headers={'Authorization': 'Bearer ' + self.token, - 'Content-Type': 'application/json'}) + resp = requests.post( + 'https://api.pushbullet.com/v2/ephemerals', + data=json.dumps( + { + 'type': 'push', + 'push': { + 'body': text, + 'type': 'clip', + 'source_device_iden': device_id, + }, + } + ), + headers={ + 'Authorization': 'Bearer ' + self.token, + 'Content-Type': 'application/json', + }, + ) resp.raise_for_status() diff --git a/platypush/plugins/qrcode/__init__.py b/platypush/plugins/qrcode/__init__.py index e97fb36573..93bfc1d47d 100644 --- a/platypush/plugins/qrcode/__init__.py +++ b/platypush/plugins/qrcode/__init__.py @@ -8,7 +8,11 @@ from typing import Optional, List from platypush import Config from platypush.context import get_bus from platypush.message.event.qrcode import QrcodeScannedEvent -from platypush.message.response.qrcode import QrcodeGeneratedResponse, QrcodeDecodedResponse, ResultModel +from platypush.message.response.qrcode import ( + QrcodeGeneratedResponse, + QrcodeDecodedResponse, + ResultModel, +) from platypush.plugins import Plugin, action from platypush.plugins.camera import CameraPlugin from platypush.utils import get_plugin_class_by_name @@ -36,20 +40,28 @@ class QrcodePlugin(Plugin): self.camera_plugin = camera_plugin self._capturing = threading.Event() - def _get_camera(self, camera_plugin: Optional[str] = None, **config) -> CameraPlugin: + def _get_camera( + self, camera_plugin: Optional[str] = None, **config + ) -> CameraPlugin: camera_plugin = camera_plugin or self.camera_plugin if not config: config = Config.get(camera_plugin) or {} config['stream_raw_frames'] = True cls = get_plugin_class_by_name(camera_plugin) - assert cls and issubclass(cls, CameraPlugin), '{} is not a valid camera plugin'.format(camera_plugin) + assert cls and issubclass( + cls, CameraPlugin + ), '{} is not a valid camera plugin'.format(camera_plugin) return cls(**config) - # noinspection PyShadowingBuiltins @action - def generate(self, content: str, output_file: Optional[str] = None, show: bool = False, - format: str = 'png') -> QrcodeGeneratedResponse: + def generate( + self, + content: str, + output_file: Optional[str] = None, + show: bool = False, + format: str = 'png', + ) -> QrcodeGeneratedResponse: """ Generate a QR code. If you configured the :class:`platypush.backend.http.HttpBackend` then you can also generate @@ -65,6 +77,7 @@ class QrcodePlugin(Plugin): :return: :class:`platypush.message.response.qrcode.QrcodeGeneratedResponse`. """ import qrcode + qr = qrcode.make(content) img = qr.get_image() ret = { @@ -105,17 +118,26 @@ class QrcodePlugin(Plugin): import numpy as np from PIL import Image - assert isinstance(frame, np.ndarray), \ - 'Image conversion only works with numpy arrays for now (got {})'.format(type(frame)) + assert isinstance( + frame, np.ndarray + ), 'Image conversion only works with numpy arrays for now (got {})'.format( + type(frame) + ) mode = 'RGB' if len(frame.shape) > 2 and frame.shape[2] == 4: mode = 'RGBA' - return Image.frombuffer(mode, (frame.shape[1], frame.shape[0]), frame, 'raw', mode, 0, 1) + return Image.frombuffer( + mode, (frame.shape[1], frame.shape[0]), frame, 'raw', mode, 0, 1 + ) @action - def start_scanning(self, camera_plugin: Optional[str] = None, duration: Optional[float] = None, - n_codes: Optional[int] = None) -> Optional[List[ResultModel]]: + def start_scanning( + self, + camera_plugin: Optional[str] = None, + duration: Optional[float] = None, + n_codes: Optional[int] = None, + ) -> Optional[List[ResultModel]]: """ Decode QR-codes and bar codes using a camera. @@ -130,6 +152,7 @@ class QrcodePlugin(Plugin): :class:`platypush.message.response.qrcode.ResultModel` instances with the scanned results, """ from pyzbar import pyzbar + assert not self._capturing.is_set(), 'A capturing process is already running' camera = self._get_camera(camera_plugin) @@ -143,9 +166,11 @@ class QrcodePlugin(Plugin): with camera: start_time = time.time() - while self._capturing.is_set() \ - and (not duration or time.time() < start_time + duration) \ - and (not n_codes or len(codes) < n_codes): + while ( + self._capturing.is_set() + and (not duration or time.time() < start_time + duration) + and (not n_codes or len(codes) < n_codes) + ): output = camera.get_stream() with output.ready: output.ready.wait() @@ -153,15 +178,21 @@ class QrcodePlugin(Plugin): results = pyzbar.decode(img) if results: results = [ - result for result in QrcodeDecodedResponse(results).output['results'] + result + for result in QrcodeDecodedResponse(results).output[ + 'results' + ] if result['data'] not in last_results - or time.time() >= last_results_time + last_results_timeout + or time.time() + >= last_results_time + last_results_timeout ] if results: codes.extend(results) get_bus().post(QrcodeScannedEvent(results=results)) - last_results = {result['data']: result for result in results} + last_results = { + result['data']: result for result in results + } last_results_time = time.time() finally: self._capturing.clear() @@ -172,4 +203,5 @@ class QrcodePlugin(Plugin): def stop_scanning(self): self._capturing.clear() + # vim:sw=4:ts=4:et: