From 7eca1c27c9d2308f182a05bb4a1ca950af1c213b Mon Sep 17 00:00:00 2001
From: Fabio Manganiello <fabio@manganiello.tech>
Date: Mon, 22 May 2023 02:33:54 +0200
Subject: [PATCH] Blackened the qrcode and pushbullet plugins

---
 platypush/plugins/pushbullet/__init__.py | 158 ++++++++++++++---------
 platypush/plugins/qrcode/__init__.py     |  66 +++++++---
 2 files changed, 144 insertions(+), 80 deletions(-)

diff --git a/platypush/plugins/pushbullet/__init__.py b/platypush/plugins/pushbullet/__init__.py
index 65ccbdc24..ab2c6ab21 100644
--- a/platypush/plugins/pushbullet/__init__.py
+++ b/platypush/plugins/pushbullet/__init__.py
@@ -16,7 +16,8 @@ class PushbulletPlugin(Plugin):
 
     Requires:
 
-        * The :class:`platypush.backend.pushbullet.Pushbullet` backend enabled
+        * The :class:`platypush.backend.pushbullet.PushbulletBackend` backend enabled
+
     """
 
     def __init__(self, token: Optional[str] = None, **kwargs):
@@ -44,23 +45,23 @@ class PushbulletPlugin(Plugin):
         """
         Get the list of available devices
         """
-        resp = requests.get('https://api.pushbullet.com/v2/devices',
-                            headers={'Authorization': 'Bearer ' + self.token,
-                                     'Content-Type': 'application/json'})
+        resp = requests.get(
+            'https://api.pushbullet.com/v2/devices',
+            headers={
+                'Authorization': 'Bearer ' + self.token,
+                'Content-Type': 'application/json',
+            },
+        )
 
         self._devices = resp.json().get('devices', [])
-        self._devices_by_id = {
-            dev['iden']: dev
-            for dev in self._devices
-        }
+        self._devices_by_id = {dev['iden']: dev for dev in self._devices}
 
         self._devices_by_name = {
-            dev['nickname']: dev
-            for dev in self._devices if 'nickname' in dev
+            dev['nickname']: dev for dev in self._devices if 'nickname' in dev
         }
 
     @action
-    def get_device(self, device):
+    def get_device(self, device) -> Optional[dict]:
         """
         :param device: Device ID or name
         """
@@ -79,7 +80,14 @@ class PushbulletPlugin(Plugin):
             refreshed = True
 
     @action
-    def send_note(self, device: str = None, body: str = None, title: str = None, url: str = None, **kwargs):
+    def send_note(
+        self,
+        device: Optional[str] = None,
+        body: Optional[str] = None,
+        title: Optional[str] = None,
+        url: Optional[str] = None,
+        **kwargs,
+    ):
         """
         Send a note push.
 
@@ -90,10 +98,11 @@ class PushbulletPlugin(Plugin):
         :param kwargs: Push arguments, see https://docs.pushbullet.com/#create-push
         """
 
+        dev = None
         if device:
-            device = self.get_device(device).output
-            if not device:
-                raise RuntimeError('No such device')
+            dev = self.get_device(device).output
+            if not dev:
+                raise RuntimeError(f'No such device: {device}')
 
         kwargs['body'] = body
         kwargs['title'] = title
@@ -102,21 +111,25 @@ class PushbulletPlugin(Plugin):
             kwargs['type'] = 'link'
             kwargs['url'] = url
 
-        if device:
-            # noinspection PyTypeChecker
-            kwargs['device_iden'] = device['iden']
+        if dev:
+            kwargs['device_iden'] = dev['iden']
 
-        resp = requests.post('https://api.pushbullet.com/v2/pushes',
-                             data=json.dumps(kwargs),
-                             headers={'Authorization': 'Bearer ' + self.token,
-                                      'Content-Type': 'application/json'})
+        resp = requests.post(
+            'https://api.pushbullet.com/v2/pushes',
+            data=json.dumps(kwargs),
+            headers={
+                'Authorization': 'Bearer ' + self.token,
+                'Content-Type': 'application/json',
+            },
+        )
 
         if resp.status_code >= 400:
-            raise Exception('Pushbullet push failed with status {}: {}'.
-                            format(resp.status_code, resp.json()))
+            raise Exception(
+                f'Pushbullet push failed with status {resp.status_code}: {resp.json()}'
+            )
 
     @action
-    def send_file(self, filename: str, device: str = None):
+    def send_file(self, filename: str, device: Optional[str] = None):
         """
         Send a file.
 
@@ -124,48 +137,61 @@ class PushbulletPlugin(Plugin):
         :param filename: Path to the local file
         """
 
+        dev = None
         if device:
-            device = self.get_device(device).output
-            if not device:
-                raise RuntimeError('No such device')
+            dev = self.get_device(device).output
+            if not dev:
+                raise RuntimeError(f'No such device: {device}')
 
-        resp = requests.post('https://api.pushbullet.com/v2/upload-request',
-                             data=json.dumps({'file_name': os.path.basename(filename)}),
-                             headers={'Authorization': 'Bearer ' + self.token,
-                                      'Content-Type': 'application/json'})
+        resp = requests.post(
+            'https://api.pushbullet.com/v2/upload-request',
+            data=json.dumps({'file_name': os.path.basename(filename)}),
+            headers={
+                'Authorization': 'Bearer ' + self.token,
+                'Content-Type': 'application/json',
+            },
+        )
 
         if resp.status_code != 200:
-            raise Exception('Pushbullet file upload request failed with status {}'.
-                            format(resp.status_code))
+            raise Exception(
+                f'Pushbullet file upload request failed with status {resp.status_code}'
+            )
 
         r = resp.json()
-        resp = requests.post(r['upload_url'], data=r['data'],
-                             files={'file': open(filename, 'rb')})
+        with open(filename, 'rb') as f:
+            resp = requests.post(r['upload_url'], data=r['data'], files={'file': f})
 
         if resp.status_code != 204:
-            raise Exception('Pushbullet file upload failed with status {}'.
-                            format(resp.status_code))
+            raise Exception(
+                f'Pushbullet file upload failed with status {resp.status_code}'
+            )
 
-        # noinspection PyTypeChecker
-        resp = requests.post('https://api.pushbullet.com/v2/pushes',
-                             headers={'Authorization': 'Bearer ' + self.token,
-                                      'Content-Type': 'application/json'},
-
-                             data=json.dumps({
-                                 'type': 'file',
-                                 'device_iden': device['iden'] if device else None,
-                                 'file_name': r['file_name'],
-                                 'file_type': r['file_type'],
-                                 'file_url': r['file_url']}))
+        resp = requests.post(
+            'https://api.pushbullet.com/v2/pushes',
+            headers={
+                'Authorization': 'Bearer ' + self.token,
+                'Content-Type': 'application/json',
+            },
+            data=json.dumps(
+                {
+                    'type': 'file',
+                    'device_iden': dev['iden'] if dev else None,
+                    'file_name': r['file_name'],
+                    'file_type': r['file_type'],
+                    'file_url': r['file_url'],
+                }
+            ),
+        )
 
         if resp.status_code >= 400:
-            raise Exception('Pushbullet file push failed with status {}'.
-                            format(resp.status_code))
+            raise Exception(
+                f'Pushbullet file push failed with status {resp.status_code}'
+            )
 
         return {
             'filename': r['file_name'],
             'type': r['file_type'],
-            'url': r['file_url']
+            'url': r['file_url'],
         }
 
     @action
@@ -178,17 +204,23 @@ class PushbulletPlugin(Plugin):
         backend = get_backend('pushbullet')
         device_id = backend.get_device_id() if backend else None
 
-        resp = requests.post('https://api.pushbullet.com/v2/ephemerals',
-                             data=json.dumps({
-                                 'type': 'push',
-                                 'push': {
-                                     'body': text,
-                                     'type': 'clip',
-                                     'source_device_iden': device_id,
-                                 },
-                             }),
-                             headers={'Authorization': 'Bearer ' + self.token,
-                                      'Content-Type': 'application/json'})
+        resp = requests.post(
+            'https://api.pushbullet.com/v2/ephemerals',
+            data=json.dumps(
+                {
+                    'type': 'push',
+                    'push': {
+                        'body': text,
+                        'type': 'clip',
+                        'source_device_iden': device_id,
+                    },
+                }
+            ),
+            headers={
+                'Authorization': 'Bearer ' + self.token,
+                'Content-Type': 'application/json',
+            },
+        )
 
         resp.raise_for_status()
 
diff --git a/platypush/plugins/qrcode/__init__.py b/platypush/plugins/qrcode/__init__.py
index e97fb3657..93bfc1d47 100644
--- a/platypush/plugins/qrcode/__init__.py
+++ b/platypush/plugins/qrcode/__init__.py
@@ -8,7 +8,11 @@ from typing import Optional, List
 from platypush import Config
 from platypush.context import get_bus
 from platypush.message.event.qrcode import QrcodeScannedEvent
-from platypush.message.response.qrcode import QrcodeGeneratedResponse, QrcodeDecodedResponse, ResultModel
+from platypush.message.response.qrcode import (
+    QrcodeGeneratedResponse,
+    QrcodeDecodedResponse,
+    ResultModel,
+)
 from platypush.plugins import Plugin, action
 from platypush.plugins.camera import CameraPlugin
 from platypush.utils import get_plugin_class_by_name
@@ -36,20 +40,28 @@ class QrcodePlugin(Plugin):
         self.camera_plugin = camera_plugin
         self._capturing = threading.Event()
 
-    def _get_camera(self, camera_plugin: Optional[str] = None, **config) -> CameraPlugin:
+    def _get_camera(
+        self, camera_plugin: Optional[str] = None, **config
+    ) -> CameraPlugin:
         camera_plugin = camera_plugin or self.camera_plugin
         if not config:
             config = Config.get(camera_plugin) or {}
         config['stream_raw_frames'] = True
 
         cls = get_plugin_class_by_name(camera_plugin)
-        assert cls and issubclass(cls, CameraPlugin), '{} is not a valid camera plugin'.format(camera_plugin)
+        assert cls and issubclass(
+            cls, CameraPlugin
+        ), '{} is not a valid camera plugin'.format(camera_plugin)
         return cls(**config)
 
-    # noinspection PyShadowingBuiltins
     @action
-    def generate(self, content: str, output_file: Optional[str] = None, show: bool = False,
-                 format: str = 'png') -> QrcodeGeneratedResponse:
+    def generate(
+        self,
+        content: str,
+        output_file: Optional[str] = None,
+        show: bool = False,
+        format: str = 'png',
+    ) -> QrcodeGeneratedResponse:
         """
         Generate a QR code.
         If you configured the :class:`platypush.backend.http.HttpBackend` then you can also generate
@@ -65,6 +77,7 @@ class QrcodePlugin(Plugin):
         :return: :class:`platypush.message.response.qrcode.QrcodeGeneratedResponse`.
         """
         import qrcode
+
         qr = qrcode.make(content)
         img = qr.get_image()
         ret = {
@@ -105,17 +118,26 @@ class QrcodePlugin(Plugin):
         import numpy as np
         from PIL import Image
 
-        assert isinstance(frame, np.ndarray), \
-            'Image conversion only works with numpy arrays for now (got {})'.format(type(frame))
+        assert isinstance(
+            frame, np.ndarray
+        ), 'Image conversion only works with numpy arrays for now (got {})'.format(
+            type(frame)
+        )
         mode = 'RGB'
         if len(frame.shape) > 2 and frame.shape[2] == 4:
             mode = 'RGBA'
 
-        return Image.frombuffer(mode, (frame.shape[1], frame.shape[0]), frame, 'raw', mode, 0, 1)
+        return Image.frombuffer(
+            mode, (frame.shape[1], frame.shape[0]), frame, 'raw', mode, 0, 1
+        )
 
     @action
-    def start_scanning(self, camera_plugin: Optional[str] = None, duration: Optional[float] = None,
-                       n_codes: Optional[int] = None) -> Optional[List[ResultModel]]:
+    def start_scanning(
+        self,
+        camera_plugin: Optional[str] = None,
+        duration: Optional[float] = None,
+        n_codes: Optional[int] = None,
+    ) -> Optional[List[ResultModel]]:
         """
         Decode QR-codes and bar codes using a camera.
 
@@ -130,6 +152,7 @@ class QrcodePlugin(Plugin):
             :class:`platypush.message.response.qrcode.ResultModel` instances with the scanned results,
         """
         from pyzbar import pyzbar
+
         assert not self._capturing.is_set(), 'A capturing process is already running'
 
         camera = self._get_camera(camera_plugin)
@@ -143,9 +166,11 @@ class QrcodePlugin(Plugin):
             with camera:
                 start_time = time.time()
 
-                while self._capturing.is_set() \
-                        and (not duration or time.time() < start_time + duration) \
-                        and (not n_codes or len(codes) < n_codes):
+                while (
+                    self._capturing.is_set()
+                    and (not duration or time.time() < start_time + duration)
+                    and (not n_codes or len(codes) < n_codes)
+                ):
                     output = camera.get_stream()
                     with output.ready:
                         output.ready.wait()
@@ -153,15 +178,21 @@ class QrcodePlugin(Plugin):
                         results = pyzbar.decode(img)
                         if results:
                             results = [
-                                result for result in QrcodeDecodedResponse(results).output['results']
+                                result
+                                for result in QrcodeDecodedResponse(results).output[
+                                    'results'
+                                ]
                                 if result['data'] not in last_results
-                                   or time.time() >= last_results_time + last_results_timeout
+                                or time.time()
+                                >= last_results_time + last_results_timeout
                             ]
 
                         if results:
                             codes.extend(results)
                             get_bus().post(QrcodeScannedEvent(results=results))
-                            last_results = {result['data']: result for result in results}
+                            last_results = {
+                                result['data']: result for result in results
+                            }
                             last_results_time = time.time()
         finally:
             self._capturing.clear()
@@ -172,4 +203,5 @@ class QrcodePlugin(Plugin):
     def stop_scanning(self):
         self._capturing.clear()
 
+
 # vim:sw=4:ts=4:et: