Support for scanning QR-codes and barcodes through a camera plugin

This commit is contained in:
Fabio Manganiello 2020-03-10 22:35:50 +01:00
parent c9db887505
commit 141275ecdf
3 changed files with 152 additions and 20 deletions

View file

@ -0,0 +1,19 @@
from typing import List
from platypush.message.event import Event
from platypush.message.response.qrcode import ResultModel
class QrcodeEvent(Event):
pass
class QrcodeScannedEvent(Event):
"""
Event triggered when a QR-code or bar code is scanned.
"""
def __init__(self, results: List[ResultModel], *args, **kwargs):
super().__init__(*args, results=results, **kwargs)
# vim:sw=4:ts=4:et:

View file

@ -19,27 +19,39 @@ from platypush.plugins import Plugin, action
class StreamingOutput:
def __init__(self):
def __init__(self, raw=False):
self.frame = None
self.raw_frame = None
self.raw = raw
self.buffer = io.BytesIO()
self.ready = threading.Condition()
@staticmethod
def is_new_frame(buf):
def is_new_frame(self, buf):
if self.raw:
return True
# JPEG header begin
return buf.startswith(b'\xff\xd8')
def write(self, buf):
if self.is_new_frame(buf):
# New frame, copy the existing buffer's content and notify all clients that it's available
self.buffer.truncate()
with self.ready:
self.frame = self.buffer.getvalue()
self.ready.notify_all()
self.buffer.seek(0)
if self.raw:
with self.ready:
self.raw_frame = buf
self.ready.notify_all()
else:
# New frame, copy the existing buffer's content and notify all clients that it's available
self.buffer.truncate()
with self.ready:
self.frame = self.buffer.getvalue()
self.ready.notify_all()
self.buffer.seek(0)
return self.buffer.write(buf)
def close(self):
self.buffer.close()
class CameraPlugin(Plugin):
"""
@ -75,7 +87,7 @@ class CameraPlugin(Plugin):
sleep_between_frames=_default_sleep_between_frames,
max_stored_frames=_max_stored_frames,
color_transform=_default_color_transform,
scale_x=None, scale_y=None, rotate=None, flip=None, **kwargs):
scale_x=None, scale_y=None, rotate=None, flip=None, stream_raw_frames=False, **kwargs):
"""
:param device_id: Index of the default video device to be used for
capturing (default: 0)
@ -145,6 +157,7 @@ class CameraPlugin(Plugin):
self.frames_dir = os.path.abspath(os.path.expanduser(frames_dir or self._default_frames_dir))
self.warmup_frames = warmup_frames
self.video_type = video_type
self.stream_raw_frames = stream_raw_frames
if isinstance(video_type, str):
import cv2
@ -321,12 +334,15 @@ class CameraPlugin(Plugin):
interpolation=cv2.INTER_CUBIC)
if self._output:
result, frame = cv2.imencode('.jpg', frame)
if not result:
self.logger.warning('Unable to convert frame to JPEG')
continue
if not self.stream_raw_frames:
result, frame = cv2.imencode('.jpg', frame)
if not result:
self.logger.warning('Unable to convert frame to JPEG')
continue
self._output.write(frame.tobytes())
self._output.write(frame.tobytes())
else:
self._output.write(frame)
elif frames_dir:
self._store_frame_to_file(frame=frame, frames_dir=frames_dir, image_file=image_file)
@ -584,12 +600,14 @@ class CameraPlugin(Plugin):
def __enter__(self):
device_id = self.default_device_id
self._output = StreamingOutput()
self._output = StreamingOutput(raw=self.stream_raw_frames)
self._init_device(device_id=device_id)
self.start_recording(device_id=device_id)
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop_recording(self.default_device_id)
if self._output:
self._output.close()
self._output = None

View file

@ -1,10 +1,20 @@
import base64
import io
import os
from typing import Optional
import threading
import time
from typing import Optional, List
from platypush.message.response.qrcode import QrcodeGeneratedResponse, QrcodeDecodedResponse
import numpy as np
from PIL import Image
from platypush import Config
from platypush.context import get_bus
from platypush.message.event.qrcode import QrcodeScannedEvent
from platypush.message.response.qrcode import QrcodeGeneratedResponse, QrcodeDecodedResponse, ResultModel
from platypush.plugins import Plugin, action
from platypush.plugins.camera import CameraPlugin
from platypush.utils import get_plugin_class_by_name
class QrcodePlugin(Plugin):
@ -19,13 +29,29 @@ class QrcodePlugin(Plugin):
"""
def __init__(self, **kwargs):
def __init__(self, camera_plugin: Optional[str] = None, **kwargs):
"""
:param camera_plugin: Name of the plugin that will be used as a camera to capture images (e.g.
``camera`` or ``camera.pi``).
"""
super().__init__(**kwargs)
self.camera_plugin = camera_plugin
self._capturing = threading.Event()
def _get_camera(self, camera_plugin: Optional[str] = None, **config) -> CameraPlugin:
camera_plugin = camera_plugin or self.camera_plugin
if not config:
config = Config.get(camera_plugin) or {}
config['stream_raw_frames'] = True
cls = get_plugin_class_by_name(camera_plugin)
assert cls and issubclass(cls, CameraPlugin), '{} is not a valid camera plugin'.format(camera_plugin)
return cls(**config)
# noinspection PyShadowingBuiltins
@action
def generate(self, content: str, output_file: Optional[str] = None, show: bool = False,
format: str = 'png') -> QrcodeGeneratedResponse:
format: str = 'png', camera_plugin: Optional[str] = None) -> QrcodeGeneratedResponse:
"""
Generate a QR code.
If you configured the :class`:platypush.backend.http.HttpBackend` then you can also generate
@ -38,6 +64,8 @@ class QrcodePlugin(Plugin):
:param show: If True, and if the device where the application runs has an active display,
then the generated QR code will be shown on display.
:param format: Output image format (default: ``png``).
:param camera_plugin: If set then this plugin (e.g. ``camera`` or ``camera.pi``) will be used to capture
live images from the camera and search for bar codes or QR-codes.
:return: :class:`platypush.message.response.qrcode.QrcodeGeneratedResponse`.
"""
import qrcode
@ -76,5 +104,72 @@ class QrcodePlugin(Plugin):
results = pyzbar.decode(img)
return QrcodeDecodedResponse(results)
def _convert_frame(self, frame) -> Image:
assert isinstance(frame, np.ndarray), 'Image conversion only works with numpy arrays for now'
mode = 'RGB'
if len(frame.shape) > 2 and frame.shape[2] == 4:
mode = 'RGBA'
return Image.frombuffer(mode, (frame.shape[1], frame.shape[0]), frame, 'raw', mode, 0, 1)
@action
def start_scanning(self, camera_plugin: Optional[str] = None, duration: Optional[float] = None,
n_codes: Optional[int] = None) -> Optional[List[ResultModel]]:
"""
Decode QR-codes and bar codes using a camera.
Triggers:
- :class:`platypush.message.event.qrcode.QrcodeScannedEvent` when a code is successfully scanned.
:param camera_plugin: Camera plugin (overrides default ``camera_plugin``).
:param duration: How long the capturing phase should run (default: until ``stop_scanning`` or app termination).
:param n_codes: Stop after decoding this number of codes (default: None).
:return: When ``duration`` or ``n_codes`` are specified or ``stop_scanning`` is called, it will return a list of
:class:`platypush.message.response.qrcode.ResultModel` instances with the scanned results,
"""
from pyzbar import pyzbar
assert not self._capturing.is_set(), 'A capturing process is already running'
camera = self._get_camera(camera_plugin)
codes = []
last_results = {}
last_results_timeout = 10.0
last_results_time = 0
self._capturing.set()
try:
with camera:
start_time = time.time()
while self._capturing.is_set() \
and (not duration or time.time() < start_time + duration) \
and (not n_codes or len(codes) < n_codes):
output = camera.get_stream()
with output.ready:
output.ready.wait()
img = self._convert_frame(output.raw_frame)
results = pyzbar.decode(img)
if results:
results = [
result for result in QrcodeDecodedResponse(results).output['results']
if result['data'] not in last_results
or time.time() >= last_results_time + last_results_timeout
]
if results:
codes.extend(results)
get_bus().post(QrcodeScannedEvent(results=results))
last_results = {result['data']: result for result in results}
last_results_time = time.time()
finally:
self._capturing.clear()
return codes
@action
def stop_scanning(self):
self._capturing.clear()
# vim:sw=4:ts=4:et: