import base64 import io import os import threading import time from typing import Optional, List import qrcode from pyzbar import pyzbar from PIL import Image from platypush import Config from platypush.context import get_bus from platypush.message.event.qrcode import QrcodeScannedEvent from platypush.plugins import Plugin, action from platypush.plugins.camera import CameraPlugin from platypush.schemas.qrcode import ( QrcodeDecodedSchema, QrcodeDecodedResultSchema, QrcodeGeneratedSchema, ) from platypush.utils import get_plugin_class_by_name class QrcodePlugin(Plugin): """ Plugin to generate and scan QR and bar codes. """ def __init__(self, camera_plugin: Optional[str] = None, **kwargs): """ :param camera_plugin: Name of the plugin that will be used as a camera to capture images (e.g. ``camera.cv`` or ``camera.pi``). This is required if you want to use the ``start_scanning`` action to scan QR codes from a camera. """ super().__init__(**kwargs) self.camera_plugin = camera_plugin self._capturing = threading.Event() def _get_camera( self, camera_plugin: Optional[str] = None, **config ) -> CameraPlugin: camera_plugin = camera_plugin or self.camera_plugin assert camera_plugin, 'No camera plugin specified' if not config: config = Config.get(camera_plugin) or {} config['stream_raw_frames'] = True cls = get_plugin_class_by_name(camera_plugin) assert cls and issubclass( cls, CameraPlugin ), f'{camera_plugin} is not a valid camera plugin' return cls(**config) @action def generate( self, content: str, output_file: Optional[str] = None, show: bool = False, format: str = 'png', ) -> dict: """ Generate a QR code. If you configured the :class:`platypush.backend.http.HttpBackend` then you can also generate codes directly from the browser through ``http://:/qrcode?content=...``. :param content: Text, URL or content of the QR code. :param output_file: If set then the QR code will be exported in the specified image file. Otherwise, a base64-encoded representation of its binary content will be returned in the response as ``data``. :param show: If True, and if the device where the application runs has an active display, then the generated QR code will be shown on display. :param format: Output image format (default: ``png``). :return: .. schema:: qrcode.QrcodeGeneratedSchema """ qr = qrcode.make(content) img = qr.get_image() ret = { 'content': content, 'format': format, } if show: img.show() if output_file: output_file = os.path.abspath(os.path.expanduser(output_file)) img.save(output_file, format=format) ret['image_file'] = output_file else: f = io.BytesIO() img.save(f, format=format) ret['data'] = base64.encodebytes(f.getvalue()).decode() return dict(QrcodeGeneratedSchema().dump(ret)) @action def decode(self, image_file: str) -> dict: """ Decode a QR code from an image file. :param image_file: Path of the image file. :return: .. schema:: qrcode.QrcodeDecodedSchema """ image_file = os.path.abspath(os.path.expanduser(image_file)) with open(image_file, 'rb') as f: img = Image.open(f) results = pyzbar.decode(img) return dict( QrcodeDecodedSchema().dump( { 'results': results, 'image_file': image_file, } ) ) @action def start_scanning( self, camera_plugin: Optional[str] = None, duration: Optional[float] = None, n_codes: Optional[int] = None, ) -> Optional[List[dict]]: """ Decode QR-codes and bar codes using a camera. :param camera_plugin: Camera plugin (overrides default ``camera_plugin``). :param duration: How long the capturing phase should run (default: until ``stop_scanning`` or app termination). :param n_codes: Stop after decoding this number of codes (default: None). :return: .. schema:: qrcode.QrcodeDecodedResultSchema(many=True) """ assert not self._capturing.is_set(), 'A capturing process is already running' camera = self._get_camera(camera_plugin) codes = [] last_results = {} last_results_timeout = 5.0 last_results_time = 0 self._capturing.set() try: with camera.open( stream=True, frames_dir=None, ) as session: camera.start_camera(session) start_time = time.time() while ( self._capturing.is_set() and (not duration or time.time() < start_time + duration) and (not n_codes or len(codes) < n_codes) ): img = camera.capture_frame(session) results = pyzbar.decode(img) if results: results = [ result for result in QrcodeDecodedResultSchema().dump( results, many=True ) if result['data'] not in last_results or time.time() >= last_results_time + last_results_timeout ] if results: codes.extend(results) get_bus().post(QrcodeScannedEvent(results=results)) last_results = {result['data']: result for result in results} last_results_time = time.time() finally: self._capturing.clear() return codes @action def stop_scanning(self): self._capturing.clear() # vim:sw=4:ts=4:et: