platypush/platypush/plugins/qrcode/__init__.py

190 lines
6.2 KiB
Python

import base64
import io
import os
import threading
import time
from typing import Optional, List
import qrcode
from pyzbar import pyzbar
from PIL import Image
from platypush import Config
from platypush.context import get_bus
from platypush.message.event.qrcode import QrcodeScannedEvent
from platypush.plugins import Plugin, action
from platypush.plugins.camera import CameraPlugin
from platypush.schemas.qrcode import (
QrcodeDecodedSchema,
QrcodeDecodedResultSchema,
QrcodeGeneratedSchema,
)
from platypush.utils import get_plugin_class_by_name
class QrcodePlugin(Plugin):
"""
Plugin to generate and scan QR and bar codes.
"""
def __init__(self, camera_plugin: Optional[str] = None, **kwargs):
"""
:param camera_plugin: Name of the plugin that will be used as a camera
to capture images (e.g. ``camera.cv`` or ``camera.pi``). This is
required if you want to use the ``start_scanning`` action to scan
QR codes from a camera.
"""
super().__init__(**kwargs)
self.camera_plugin = camera_plugin
self._capturing = threading.Event()
def _get_camera(
self, camera_plugin: Optional[str] = None, **config
) -> CameraPlugin:
camera_plugin = camera_plugin or self.camera_plugin
assert camera_plugin, 'No camera plugin specified'
if not config:
config = Config.get(camera_plugin) or {}
config['stream_raw_frames'] = True
cls = get_plugin_class_by_name(camera_plugin)
assert cls and issubclass(
cls, CameraPlugin
), f'{camera_plugin} is not a valid camera plugin'
return cls(**config)
@action
def generate(
self,
content: str,
output_file: Optional[str] = None,
show: bool = False,
format: str = 'png',
) -> dict:
"""
Generate a QR code.
If you configured the :class:`platypush.backend.http.HttpBackend` then
you can also generate codes directly from the browser through
``http://<host>:<port>/qrcode?content=...``.
:param content: Text, URL or content of the QR code.
:param output_file: If set then the QR code will be exported in the
specified image file. Otherwise, a base64-encoded representation of
its binary content will be returned in the response as ``data``.
:param show: If True, and if the device where the application runs has
an active display, then the generated QR code will be shown on
display.
:param format: Output image format (default: ``png``).
:return: .. schema:: qrcode.QrcodeGeneratedSchema
"""
qr = qrcode.make(content)
img = qr.get_image()
ret = {
'content': content,
'format': format,
}
if show:
img.show()
if output_file:
output_file = os.path.abspath(os.path.expanduser(output_file))
img.save(output_file, format=format)
ret['image_file'] = output_file
else:
f = io.BytesIO()
img.save(f, format=format)
ret['data'] = base64.encodebytes(f.getvalue()).decode()
return dict(QrcodeGeneratedSchema().dump(ret))
@action
def decode(self, image_file: str) -> dict:
"""
Decode a QR code from an image file.
:param image_file: Path of the image file.
:return: .. schema:: qrcode.QrcodeDecodedSchema
"""
image_file = os.path.abspath(os.path.expanduser(image_file))
with open(image_file, 'rb') as f:
img = Image.open(f)
results = pyzbar.decode(img)
return dict(
QrcodeDecodedSchema().dump(
{
'results': results,
'image_file': image_file,
}
)
)
@action
def start_scanning(
self,
camera_plugin: Optional[str] = None,
duration: Optional[float] = None,
n_codes: Optional[int] = None,
) -> Optional[List[dict]]:
"""
Decode QR-codes and bar codes using a camera.
:param camera_plugin: Camera plugin (overrides default ``camera_plugin``).
:param duration: How long the capturing phase should run (default:
until ``stop_scanning`` or app termination).
:param n_codes: Stop after decoding this number of codes (default: None).
:return: .. schema:: qrcode.QrcodeDecodedResultSchema(many=True)
"""
assert not self._capturing.is_set(), 'A capturing process is already running'
camera = self._get_camera(camera_plugin)
codes = []
last_results = {}
last_results_timeout = 5.0
last_results_time = 0
self._capturing.set()
try:
with camera.open(
stream=True,
frames_dir=None,
) as session:
camera.start_camera(session)
start_time = time.time()
while (
self._capturing.is_set()
and (not duration or time.time() < start_time + duration)
and (not n_codes or len(codes) < n_codes)
):
img = camera.capture_frame(session)
results = pyzbar.decode(img)
if results:
results = [
result
for result in QrcodeDecodedResultSchema().dump(
results, many=True
)
if result['data'] not in last_results
or time.time() >= last_results_time + last_results_timeout
]
if results:
codes.extend(results)
get_bus().post(QrcodeScannedEvent(results=results))
last_results = {result['data']: result for result in results}
last_results_time = time.time()
finally:
self._capturing.clear()
return codes
@action
def stop_scanning(self):
self._capturing.clear()
# vim:sw=4:ts=4:et: