Blackened the qrcode and pushbullet plugins

This commit is contained in:
Fabio Manganiello 2023-05-22 02:33:54 +02:00
parent aaac6488d6
commit 7eca1c27c9
Signed by untrusted user: blacklight
GPG key ID: D90FBA7F76362774
2 changed files with 144 additions and 80 deletions

View file

@ -16,7 +16,8 @@ class PushbulletPlugin(Plugin):
Requires:
* The :class:`platypush.backend.pushbullet.Pushbullet` backend enabled
* The :class:`platypush.backend.pushbullet.PushbulletBackend` backend enabled
"""
def __init__(self, token: Optional[str] = None, **kwargs):
@ -44,23 +45,23 @@ class PushbulletPlugin(Plugin):
"""
Get the list of available devices
"""
resp = requests.get('https://api.pushbullet.com/v2/devices',
headers={'Authorization': 'Bearer ' + self.token,
'Content-Type': 'application/json'})
resp = requests.get(
'https://api.pushbullet.com/v2/devices',
headers={
'Authorization': 'Bearer ' + self.token,
'Content-Type': 'application/json',
},
)
self._devices = resp.json().get('devices', [])
self._devices_by_id = {
dev['iden']: dev
for dev in self._devices
}
self._devices_by_id = {dev['iden']: dev for dev in self._devices}
self._devices_by_name = {
dev['nickname']: dev
for dev in self._devices if 'nickname' in dev
dev['nickname']: dev for dev in self._devices if 'nickname' in dev
}
@action
def get_device(self, device):
def get_device(self, device) -> Optional[dict]:
"""
:param device: Device ID or name
"""
@ -79,7 +80,14 @@ class PushbulletPlugin(Plugin):
refreshed = True
@action
def send_note(self, device: str = None, body: str = None, title: str = None, url: str = None, **kwargs):
def send_note(
self,
device: Optional[str] = None,
body: Optional[str] = None,
title: Optional[str] = None,
url: Optional[str] = None,
**kwargs,
):
"""
Send a note push.
@ -90,10 +98,11 @@ class PushbulletPlugin(Plugin):
:param kwargs: Push arguments, see https://docs.pushbullet.com/#create-push
"""
dev = None
if device:
device = self.get_device(device).output
if not device:
raise RuntimeError('No such device')
dev = self.get_device(device).output
if not dev:
raise RuntimeError(f'No such device: {device}')
kwargs['body'] = body
kwargs['title'] = title
@ -102,21 +111,25 @@ class PushbulletPlugin(Plugin):
kwargs['type'] = 'link'
kwargs['url'] = url
if device:
# noinspection PyTypeChecker
kwargs['device_iden'] = device['iden']
if dev:
kwargs['device_iden'] = dev['iden']
resp = requests.post('https://api.pushbullet.com/v2/pushes',
resp = requests.post(
'https://api.pushbullet.com/v2/pushes',
data=json.dumps(kwargs),
headers={'Authorization': 'Bearer ' + self.token,
'Content-Type': 'application/json'})
headers={
'Authorization': 'Bearer ' + self.token,
'Content-Type': 'application/json',
},
)
if resp.status_code >= 400:
raise Exception('Pushbullet push failed with status {}: {}'.
format(resp.status_code, resp.json()))
raise Exception(
f'Pushbullet push failed with status {resp.status_code}: {resp.json()}'
)
@action
def send_file(self, filename: str, device: str = None):
def send_file(self, filename: str, device: Optional[str] = None):
"""
Send a file.
@ -124,48 +137,61 @@ class PushbulletPlugin(Plugin):
:param filename: Path to the local file
"""
dev = None
if device:
device = self.get_device(device).output
if not device:
raise RuntimeError('No such device')
dev = self.get_device(device).output
if not dev:
raise RuntimeError(f'No such device: {device}')
resp = requests.post('https://api.pushbullet.com/v2/upload-request',
resp = requests.post(
'https://api.pushbullet.com/v2/upload-request',
data=json.dumps({'file_name': os.path.basename(filename)}),
headers={'Authorization': 'Bearer ' + self.token,
'Content-Type': 'application/json'})
headers={
'Authorization': 'Bearer ' + self.token,
'Content-Type': 'application/json',
},
)
if resp.status_code != 200:
raise Exception('Pushbullet file upload request failed with status {}'.
format(resp.status_code))
raise Exception(
f'Pushbullet file upload request failed with status {resp.status_code}'
)
r = resp.json()
resp = requests.post(r['upload_url'], data=r['data'],
files={'file': open(filename, 'rb')})
with open(filename, 'rb') as f:
resp = requests.post(r['upload_url'], data=r['data'], files={'file': f})
if resp.status_code != 204:
raise Exception('Pushbullet file upload failed with status {}'.
format(resp.status_code))
raise Exception(
f'Pushbullet file upload failed with status {resp.status_code}'
)
# noinspection PyTypeChecker
resp = requests.post('https://api.pushbullet.com/v2/pushes',
headers={'Authorization': 'Bearer ' + self.token,
'Content-Type': 'application/json'},
data=json.dumps({
resp = requests.post(
'https://api.pushbullet.com/v2/pushes',
headers={
'Authorization': 'Bearer ' + self.token,
'Content-Type': 'application/json',
},
data=json.dumps(
{
'type': 'file',
'device_iden': device['iden'] if device else None,
'device_iden': dev['iden'] if dev else None,
'file_name': r['file_name'],
'file_type': r['file_type'],
'file_url': r['file_url']}))
'file_url': r['file_url'],
}
),
)
if resp.status_code >= 400:
raise Exception('Pushbullet file push failed with status {}'.
format(resp.status_code))
raise Exception(
f'Pushbullet file push failed with status {resp.status_code}'
)
return {
'filename': r['file_name'],
'type': r['file_type'],
'url': r['file_url']
'url': r['file_url'],
}
@action
@ -178,17 +204,23 @@ class PushbulletPlugin(Plugin):
backend = get_backend('pushbullet')
device_id = backend.get_device_id() if backend else None
resp = requests.post('https://api.pushbullet.com/v2/ephemerals',
data=json.dumps({
resp = requests.post(
'https://api.pushbullet.com/v2/ephemerals',
data=json.dumps(
{
'type': 'push',
'push': {
'body': text,
'type': 'clip',
'source_device_iden': device_id,
},
}),
headers={'Authorization': 'Bearer ' + self.token,
'Content-Type': 'application/json'})
}
),
headers={
'Authorization': 'Bearer ' + self.token,
'Content-Type': 'application/json',
},
)
resp.raise_for_status()

View file

@ -8,7 +8,11 @@ from typing import Optional, List
from platypush import Config
from platypush.context import get_bus
from platypush.message.event.qrcode import QrcodeScannedEvent
from platypush.message.response.qrcode import QrcodeGeneratedResponse, QrcodeDecodedResponse, ResultModel
from platypush.message.response.qrcode import (
QrcodeGeneratedResponse,
QrcodeDecodedResponse,
ResultModel,
)
from platypush.plugins import Plugin, action
from platypush.plugins.camera import CameraPlugin
from platypush.utils import get_plugin_class_by_name
@ -36,20 +40,28 @@ class QrcodePlugin(Plugin):
self.camera_plugin = camera_plugin
self._capturing = threading.Event()
def _get_camera(self, camera_plugin: Optional[str] = None, **config) -> CameraPlugin:
def _get_camera(
self, camera_plugin: Optional[str] = None, **config
) -> CameraPlugin:
camera_plugin = camera_plugin or self.camera_plugin
if not config:
config = Config.get(camera_plugin) or {}
config['stream_raw_frames'] = True
cls = get_plugin_class_by_name(camera_plugin)
assert cls and issubclass(cls, CameraPlugin), '{} is not a valid camera plugin'.format(camera_plugin)
assert cls and issubclass(
cls, CameraPlugin
), '{} is not a valid camera plugin'.format(camera_plugin)
return cls(**config)
# noinspection PyShadowingBuiltins
@action
def generate(self, content: str, output_file: Optional[str] = None, show: bool = False,
format: str = 'png') -> QrcodeGeneratedResponse:
def generate(
self,
content: str,
output_file: Optional[str] = None,
show: bool = False,
format: str = 'png',
) -> QrcodeGeneratedResponse:
"""
Generate a QR code.
If you configured the :class:`platypush.backend.http.HttpBackend` then you can also generate
@ -65,6 +77,7 @@ class QrcodePlugin(Plugin):
:return: :class:`platypush.message.response.qrcode.QrcodeGeneratedResponse`.
"""
import qrcode
qr = qrcode.make(content)
img = qr.get_image()
ret = {
@ -105,17 +118,26 @@ class QrcodePlugin(Plugin):
import numpy as np
from PIL import Image
assert isinstance(frame, np.ndarray), \
'Image conversion only works with numpy arrays for now (got {})'.format(type(frame))
assert isinstance(
frame, np.ndarray
), 'Image conversion only works with numpy arrays for now (got {})'.format(
type(frame)
)
mode = 'RGB'
if len(frame.shape) > 2 and frame.shape[2] == 4:
mode = 'RGBA'
return Image.frombuffer(mode, (frame.shape[1], frame.shape[0]), frame, 'raw', mode, 0, 1)
return Image.frombuffer(
mode, (frame.shape[1], frame.shape[0]), frame, 'raw', mode, 0, 1
)
@action
def start_scanning(self, camera_plugin: Optional[str] = None, duration: Optional[float] = None,
n_codes: Optional[int] = None) -> Optional[List[ResultModel]]:
def start_scanning(
self,
camera_plugin: Optional[str] = None,
duration: Optional[float] = None,
n_codes: Optional[int] = None,
) -> Optional[List[ResultModel]]:
"""
Decode QR-codes and bar codes using a camera.
@ -130,6 +152,7 @@ class QrcodePlugin(Plugin):
:class:`platypush.message.response.qrcode.ResultModel` instances with the scanned results,
"""
from pyzbar import pyzbar
assert not self._capturing.is_set(), 'A capturing process is already running'
camera = self._get_camera(camera_plugin)
@ -143,9 +166,11 @@ class QrcodePlugin(Plugin):
with camera:
start_time = time.time()
while self._capturing.is_set() \
and (not duration or time.time() < start_time + duration) \
and (not n_codes or len(codes) < n_codes):
while (
self._capturing.is_set()
and (not duration or time.time() < start_time + duration)
and (not n_codes or len(codes) < n_codes)
):
output = camera.get_stream()
with output.ready:
output.ready.wait()
@ -153,15 +178,21 @@ class QrcodePlugin(Plugin):
results = pyzbar.decode(img)
if results:
results = [
result for result in QrcodeDecodedResponse(results).output['results']
result
for result in QrcodeDecodedResponse(results).output[
'results'
]
if result['data'] not in last_results
or time.time() >= last_results_time + last_results_timeout
or time.time()
>= last_results_time + last_results_timeout
]
if results:
codes.extend(results)
get_bus().post(QrcodeScannedEvent(results=results))
last_results = {result['data']: result for result in results}
last_results = {
result['data']: result for result in results
}
last_results_time = time.time()
finally:
self._capturing.clear()
@ -172,4 +203,5 @@ class QrcodePlugin(Plugin):
def stop_scanning(self):
self._capturing.clear()
# vim:sw=4:ts=4:et: