forked from platypush/platypush
177 lines
6.9 KiB
Python
177 lines
6.9 KiB
Python
import base64
|
|
import io
|
|
import os
|
|
import threading
|
|
import time
|
|
from typing import Optional, List
|
|
|
|
import numpy as np
|
|
from PIL import Image
|
|
|
|
from platypush import Config
|
|
from platypush.context import get_bus
|
|
from platypush.message.event.qrcode import QrcodeScannedEvent
|
|
from platypush.message.response.qrcode import QrcodeGeneratedResponse, QrcodeDecodedResponse, ResultModel
|
|
from platypush.plugins import Plugin, action
|
|
from platypush.plugins.camera import CameraPlugin
|
|
from platypush.utils import get_plugin_class_by_name
|
|
|
|
|
|
class QrcodePlugin(Plugin):
|
|
"""
|
|
Plugin to generate and scan QR and bar codes.
|
|
|
|
Requires:
|
|
|
|
* **numpy** (``pip install numpy``).
|
|
* **qrcode** (``pip install 'qrcode[pil]'``) for QR generation.
|
|
* **pyzbar** (``pip install pyzbar``) for decoding code from images.
|
|
* **Pillow** (``pip install Pillow``) for image management.
|
|
|
|
"""
|
|
|
|
def __init__(self, camera_plugin: Optional[str] = None, **kwargs):
|
|
"""
|
|
:param camera_plugin: Name of the plugin that will be used as a camera to capture images (e.g.
|
|
``camera`` or ``camera.pi``).
|
|
"""
|
|
super().__init__(**kwargs)
|
|
self.camera_plugin = camera_plugin
|
|
self._capturing = threading.Event()
|
|
|
|
def _get_camera(self, camera_plugin: Optional[str] = None, **config) -> CameraPlugin:
|
|
camera_plugin = camera_plugin or self.camera_plugin
|
|
if not config:
|
|
config = Config.get(camera_plugin) or {}
|
|
config['stream_raw_frames'] = True
|
|
|
|
cls = get_plugin_class_by_name(camera_plugin)
|
|
assert cls and issubclass(cls, CameraPlugin), '{} is not a valid camera plugin'.format(camera_plugin)
|
|
return cls(**config)
|
|
|
|
# noinspection PyShadowingBuiltins
|
|
@action
|
|
def generate(self, content: str, output_file: Optional[str] = None, show: bool = False,
|
|
format: str = 'png', camera_plugin: Optional[str] = None) -> QrcodeGeneratedResponse:
|
|
"""
|
|
Generate a QR code.
|
|
If you configured the :class`:platypush.backend.http.HttpBackend` then you can also generate
|
|
codes directly from the browser through ``http://<host>:<port>/qrcode?content=...``.
|
|
|
|
:param content: Text, URL or content of the QR code.
|
|
:param output_file: If set then the QR code will be exported in the specified image file.
|
|
Otherwise, a base64-encoded representation of its binary content will be returned in
|
|
the response as ``data``.
|
|
:param show: If True, and if the device where the application runs has an active display,
|
|
then the generated QR code will be shown on display.
|
|
:param format: Output image format (default: ``png``).
|
|
:param camera_plugin: If set then this plugin (e.g. ``camera`` or ``camera.pi``) will be used to capture
|
|
live images from the camera and search for bar codes or QR-codes.
|
|
:return: :class:`platypush.message.response.qrcode.QrcodeGeneratedResponse`.
|
|
"""
|
|
import qrcode
|
|
qr = qrcode.make(content)
|
|
img = qr.get_image()
|
|
ret = {
|
|
'content': content,
|
|
'format': format,
|
|
}
|
|
|
|
if show:
|
|
img.show()
|
|
if output_file:
|
|
output_file = os.path.abspath(os.path.expanduser(output_file))
|
|
img.save(output_file, format=format)
|
|
ret['image_file'] = output_file
|
|
else:
|
|
f = io.BytesIO()
|
|
img.save(f, format=format)
|
|
ret['data'] = base64.encodebytes(f.getvalue()).decode()
|
|
|
|
return QrcodeGeneratedResponse(**ret)
|
|
|
|
@action
|
|
def decode(self, image_file: str) -> QrcodeDecodedResponse:
|
|
"""
|
|
Decode a QR code from an image file.
|
|
|
|
:param image_file: Path of the image file.
|
|
"""
|
|
from pyzbar import pyzbar
|
|
from PIL import Image
|
|
|
|
image_file = os.path.abspath(os.path.expanduser(image_file))
|
|
img = Image.open(image_file)
|
|
results = pyzbar.decode(img)
|
|
return QrcodeDecodedResponse(results)
|
|
|
|
def _convert_frame(self, frame) -> Image:
|
|
assert isinstance(frame, np.ndarray), \
|
|
'Image conversion only works with numpy arrays for now (got {})'.format(type(frame))
|
|
mode = 'RGB'
|
|
if len(frame.shape) > 2 and frame.shape[2] == 4:
|
|
mode = 'RGBA'
|
|
|
|
return Image.frombuffer(mode, (frame.shape[1], frame.shape[0]), frame, 'raw', mode, 0, 1)
|
|
|
|
@action
|
|
def start_scanning(self, camera_plugin: Optional[str] = None, duration: Optional[float] = None,
|
|
n_codes: Optional[int] = None) -> Optional[List[ResultModel]]:
|
|
"""
|
|
Decode QR-codes and bar codes using a camera.
|
|
|
|
Triggers:
|
|
|
|
- :class:`platypush.message.event.qrcode.QrcodeScannedEvent` when a code is successfully scanned.
|
|
|
|
:param camera_plugin: Camera plugin (overrides default ``camera_plugin``).
|
|
:param duration: How long the capturing phase should run (default: until ``stop_scanning`` or app termination).
|
|
:param n_codes: Stop after decoding this number of codes (default: None).
|
|
:return: When ``duration`` or ``n_codes`` are specified or ``stop_scanning`` is called, it will return a list of
|
|
:class:`platypush.message.response.qrcode.ResultModel` instances with the scanned results,
|
|
"""
|
|
from pyzbar import pyzbar
|
|
assert not self._capturing.is_set(), 'A capturing process is already running'
|
|
|
|
camera = self._get_camera(camera_plugin)
|
|
codes = []
|
|
last_results = {}
|
|
last_results_timeout = 10.0
|
|
last_results_time = 0
|
|
self._capturing.set()
|
|
|
|
try:
|
|
with camera:
|
|
start_time = time.time()
|
|
|
|
while self._capturing.is_set() \
|
|
and (not duration or time.time() < start_time + duration) \
|
|
and (not n_codes or len(codes) < n_codes):
|
|
output = camera.get_stream()
|
|
with output.ready:
|
|
output.ready.wait()
|
|
img = self._convert_frame(output.raw_frame)
|
|
results = pyzbar.decode(img)
|
|
if results:
|
|
results = [
|
|
result for result in QrcodeDecodedResponse(results).output['results']
|
|
if result['data'] not in last_results
|
|
or time.time() >= last_results_time + last_results_timeout
|
|
]
|
|
|
|
if results:
|
|
codes.extend(results)
|
|
get_bus().post(QrcodeScannedEvent(results=results))
|
|
last_results = {result['data']: result for result in results}
|
|
last_results_time = time.time()
|
|
finally:
|
|
self._capturing.clear()
|
|
|
|
return codes
|
|
|
|
@action
|
|
def stop_scanning(self):
|
|
self._capturing.clear()
|
|
|
|
|
|
# vim:sw=4:ts=4:et:
|