Merge branch 'master' into snyk-upgrade-7087de73af11ecfaae1f5f0a8dc827f2

This commit is contained in:
Fabio Manganiello 2024-05-10 01:44:19 +02:00 committed by GitHub
commit 404737dc24
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 342 additions and 248 deletions

View file

@ -1,5 +0,0 @@
``qrcode``
=====================================
.. automodule:: platypush.message.response.qrcode
:members:

View file

@ -1,5 +0,0 @@
``ssh``
==================================
.. automodule:: platypush.message.response.ssh
:members:

View file

@ -8,7 +8,5 @@ Responses
platypush/responses/google.drive.rst platypush/responses/google.drive.rst
platypush/responses/printer.cups.rst platypush/responses/printer.cups.rst
platypush/responses/qrcode.rst
platypush/responses/ssh.rst
platypush/responses/tensorflow.rst platypush/responses/tensorflow.rst
platypush/responses/translate.rst platypush/responses/translate.rst

View file

@ -20,7 +20,7 @@
"sass-loader": "^10.5.2", "sass-loader": "^10.5.2",
"vue": "^3.4.23", "vue": "^3.4.23",
"vue-router": "^4.3.2", "vue-router": "^4.3.2",
"vue-skycons": "^4.2.0", "vue-skycons": "^4.3.4",
"w3css": "^2.7.0" "w3css": "^2.7.0"
}, },
"devDependencies": { "devDependencies": {
@ -12219,9 +12219,9 @@
} }
}, },
"node_modules/vue-skycons": { "node_modules/vue-skycons": {
"version": "4.2.0", "version": "4.3.4",
"resolved": "https://registry.npmjs.org/vue-skycons/-/vue-skycons-4.2.0.tgz", "resolved": "https://registry.npmjs.org/vue-skycons/-/vue-skycons-4.3.4.tgz",
"integrity": "sha512-Zbw9lHXNjorpzReEqsyvPty3NIY2GCMKKZmGLpt+XKUrxEJvqVcPA+OwMxhnIgZvqiZH7b+tvHeGtq2ximf8Vg==" "integrity": "sha512-fa1P6qoi2TBuuG/Us++fHlET+ywpfFbhAYPWjpYHF0/i5MNYbjKKKOmzqbImWen+rwg2drF6UGDgxIYhweXDlQ=="
}, },
"node_modules/vue-style-loader": { "node_modules/vue-style-loader": {
"version": "4.1.3", "version": "4.1.3",

View file

@ -20,7 +20,7 @@
"sass-loader": "^10.5.2", "sass-loader": "^10.5.2",
"vue": "^3.4.23", "vue": "^3.4.23",
"vue-router": "^4.3.2", "vue-router": "^4.3.2",
"vue-skycons": "^4.2.0", "vue-skycons": "^4.3.4",
"w3css": "^2.7.0" "w3css": "^2.7.0"
}, },
"devDependencies": { "devDependencies": {

Binary file not shown.

View file

@ -1,18 +1,19 @@
from typing import List from typing import List
from platypush.message.event import Event from platypush.message.event import Event
from platypush.message.response.qrcode import ResultModel
class QrcodeEvent(Event):
pass
class QrcodeScannedEvent(Event): class QrcodeScannedEvent(Event):
""" """
Event triggered when a QR-code or bar code is scanned. Event triggered when a QR-code or bar code is scanned.
""" """
def __init__(self, results: List[ResultModel], *args, **kwargs):
def __init__(self, results: List[dict], *args, **kwargs):
"""
:param results: List of decoded QR code results:
.. schema:: qrcode.QrcodeDecodedResultSchema(many=True)
"""
super().__init__(*args, results=results, **kwargs) super().__init__(*args, results=results, **kwargs)

View file

@ -1,61 +0,0 @@
import base64
from typing import Optional, List
from pyzbar.pyzbar import Decoded
from pyzbar.locations import Rect
from platypush.message import Mapping
from platypush.message.response import Response
class QrcodeResponse(Response):
pass
class QrcodeGeneratedResponse(QrcodeResponse):
# noinspection PyShadowingBuiltins
def __init__(self,
content: str,
format: str,
data: Optional[str] = None,
image_file: Optional[str] = None,
*args, **kwargs):
super().__init__(*args, output={
'text': content,
'data': data,
'format': format,
'image_file': image_file,
}, **kwargs)
class RectModel(Mapping):
def __init__(self, rect: Rect):
super().__init__()
self.left = rect.left
self.top = rect.top
self.width = rect.width
self.height = rect.height
class ResultModel(Mapping):
def __init__(self, result: Decoded, *args, **kwargs):
super().__init__(*args, **kwargs)
try:
data = result.data.decode()
except (ValueError, TypeError):
data = base64.encodebytes(result.data).decode()
self.data = data
self.type = result.type
self.rect = dict(RectModel(result.rect)) if result.rect else {}
class QrcodeDecodedResponse(QrcodeResponse):
def __init__(self, results: List[Decoded], image_file: Optional[str] = None, *args, **kwargs):
super().__init__(*args, output={
'image_file': image_file,
'results': [dict(ResultModel(result)) for result in results],
}, **kwargs)
# vim:sw=4:ts=4:et:

View file

@ -1,18 +0,0 @@
from platypush.message.response import Response
class SSHResponse(Response):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class SSHKeygenResponse(SSHResponse):
def __init__(self, fingerprint: str, key_file: str, pub_key_file: str, *args, **kwargs):
super().__init__(*args, output={
'fingerprint': fingerprint,
'key_file': key_file,
'pub_key_file': pub_key_file,
}, **kwargs)
# vim:sw=4:ts=4:et:

View file

@ -5,16 +5,20 @@ import threading
import time import time
from typing import Optional, List from typing import Optional, List
import qrcode
from pyzbar import pyzbar
from PIL import Image
from platypush import Config from platypush import Config
from platypush.context import get_bus from platypush.context import get_bus
from platypush.message.event.qrcode import QrcodeScannedEvent from platypush.message.event.qrcode import QrcodeScannedEvent
from platypush.message.response.qrcode import (
QrcodeGeneratedResponse,
QrcodeDecodedResponse,
ResultModel,
)
from platypush.plugins import Plugin, action from platypush.plugins import Plugin, action
from platypush.plugins.camera import CameraPlugin from platypush.plugins.camera import CameraPlugin
from platypush.schemas.qrcode import (
QrcodeDecodedSchema,
QrcodeDecodedResultSchema,
QrcodeGeneratedSchema,
)
from platypush.utils import get_plugin_class_by_name from platypush.utils import get_plugin_class_by_name
@ -25,8 +29,10 @@ class QrcodePlugin(Plugin):
def __init__(self, camera_plugin: Optional[str] = None, **kwargs): def __init__(self, camera_plugin: Optional[str] = None, **kwargs):
""" """
:param camera_plugin: Name of the plugin that will be used as a camera to capture images (e.g. :param camera_plugin: Name of the plugin that will be used as a camera
``camera.cv`` or ``camera.pi``). to capture images (e.g. ``camera.cv`` or ``camera.pi``). This is
required if you want to use the ``start_scanning`` action to scan
QR codes from a camera.
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
self.camera_plugin = camera_plugin self.camera_plugin = camera_plugin
@ -36,6 +42,7 @@ class QrcodePlugin(Plugin):
self, camera_plugin: Optional[str] = None, **config self, camera_plugin: Optional[str] = None, **config
) -> CameraPlugin: ) -> CameraPlugin:
camera_plugin = camera_plugin or self.camera_plugin camera_plugin = camera_plugin or self.camera_plugin
assert camera_plugin, 'No camera plugin specified'
if not config: if not config:
config = Config.get(camera_plugin) or {} config = Config.get(camera_plugin) or {}
config['stream_raw_frames'] = True config['stream_raw_frames'] = True
@ -43,7 +50,7 @@ class QrcodePlugin(Plugin):
cls = get_plugin_class_by_name(camera_plugin) cls = get_plugin_class_by_name(camera_plugin)
assert cls and issubclass( assert cls and issubclass(
cls, CameraPlugin cls, CameraPlugin
), '{} is not a valid camera plugin'.format(camera_plugin) ), f'{camera_plugin} is not a valid camera plugin'
return cls(**config) return cls(**config)
@action @action
@ -53,23 +60,24 @@ class QrcodePlugin(Plugin):
output_file: Optional[str] = None, output_file: Optional[str] = None,
show: bool = False, show: bool = False,
format: str = 'png', format: str = 'png',
) -> QrcodeGeneratedResponse: ) -> dict:
""" """
Generate a QR code. Generate a QR code.
If you configured the :class:`platypush.backend.http.HttpBackend` then you can also generate
codes directly from the browser through ``http://<host>:<port>/qrcode?content=...``. If you configured the :class:`platypush.backend.http.HttpBackend` then
you can also generate codes directly from the browser through
``http://<host>:<port>/qrcode?content=...``.
:param content: Text, URL or content of the QR code. :param content: Text, URL or content of the QR code.
:param output_file: If set then the QR code will be exported in the specified image file. :param output_file: If set then the QR code will be exported in the
Otherwise, a base64-encoded representation of its binary content will be returned in specified image file. Otherwise, a base64-encoded representation of
the response as ``data``. its binary content will be returned in the response as ``data``.
:param show: If True, and if the device where the application runs has an active display, :param show: If True, and if the device where the application runs has
then the generated QR code will be shown on display. an active display, then the generated QR code will be shown on
display.
:param format: Output image format (default: ``png``). :param format: Output image format (default: ``png``).
:return: :class:`platypush.message.response.qrcode.QrcodeGeneratedResponse`. :return: .. schema:: qrcode.QrcodeGeneratedSchema
""" """
import qrcode
qr = qrcode.make(content) qr = qrcode.make(content)
img = qr.get_image() img = qr.get_image()
ret = { ret = {
@ -79,6 +87,7 @@ class QrcodePlugin(Plugin):
if show: if show:
img.show() img.show()
if output_file: if output_file:
output_file = os.path.abspath(os.path.expanduser(output_file)) output_file = os.path.abspath(os.path.expanduser(output_file))
img.save(output_file, format=format) img.save(output_file, format=format)
@ -88,39 +97,27 @@ class QrcodePlugin(Plugin):
img.save(f, format=format) img.save(f, format=format)
ret['data'] = base64.encodebytes(f.getvalue()).decode() ret['data'] = base64.encodebytes(f.getvalue()).decode()
return QrcodeGeneratedResponse(**ret) return dict(QrcodeGeneratedSchema().dump(ret))
@action @action
def decode(self, image_file: str) -> QrcodeDecodedResponse: def decode(self, image_file: str) -> dict:
""" """
Decode a QR code from an image file. Decode a QR code from an image file.
:param image_file: Path of the image file. :param image_file: Path of the image file.
:return: .. schema:: qrcode.QrcodeDecodedSchema
""" """
from pyzbar import pyzbar
from PIL import Image
image_file = os.path.abspath(os.path.expanduser(image_file)) image_file = os.path.abspath(os.path.expanduser(image_file))
img = Image.open(image_file) with open(image_file, 'rb') as f:
img = Image.open(f)
results = pyzbar.decode(img) results = pyzbar.decode(img)
return QrcodeDecodedResponse(results) return dict(
QrcodeDecodedSchema().dump(
@staticmethod {
def _convert_frame(frame): 'results': results,
import numpy as np 'image_file': image_file,
from PIL import Image }
assert isinstance(
frame, np.ndarray
), 'Image conversion only works with numpy arrays for now (got {})'.format(
type(frame)
) )
mode = 'RGB'
if len(frame.shape) > 2 and frame.shape[2] == 4:
mode = 'RGBA'
return Image.frombuffer(
mode, (frame.shape[1], frame.shape[0]), frame, 'raw', mode, 0, 1
) )
@action @action
@ -129,29 +126,31 @@ class QrcodePlugin(Plugin):
camera_plugin: Optional[str] = None, camera_plugin: Optional[str] = None,
duration: Optional[float] = None, duration: Optional[float] = None,
n_codes: Optional[int] = None, n_codes: Optional[int] = None,
) -> Optional[List[ResultModel]]: ) -> Optional[List[dict]]:
""" """
Decode QR-codes and bar codes using a camera. Decode QR-codes and bar codes using a camera.
:param camera_plugin: Camera plugin (overrides default ``camera_plugin``). :param camera_plugin: Camera plugin (overrides default ``camera_plugin``).
:param duration: How long the capturing phase should run (default: until ``stop_scanning`` or app termination). :param duration: How long the capturing phase should run (default:
until ``stop_scanning`` or app termination).
:param n_codes: Stop after decoding this number of codes (default: None). :param n_codes: Stop after decoding this number of codes (default: None).
:return: When ``duration`` or ``n_codes`` are specified or ``stop_scanning`` is called, it will return a list of :return: .. schema:: qrcode.QrcodeDecodedResultSchema(many=True)
:class:`platypush.message.response.qrcode.ResultModel` instances with the scanned results,
""" """
from pyzbar import pyzbar
assert not self._capturing.is_set(), 'A capturing process is already running' assert not self._capturing.is_set(), 'A capturing process is already running'
camera = self._get_camera(camera_plugin) camera = self._get_camera(camera_plugin)
codes = [] codes = []
last_results = {} last_results = {}
last_results_timeout = 10.0 last_results_timeout = 5.0
last_results_time = 0 last_results_time = 0
self._capturing.set() self._capturing.set()
try: try:
with camera: with camera.open(
stream=True,
frames_dir=None,
) as session:
camera.start_camera(session)
start_time = time.time() start_time = time.time()
while ( while (
@ -159,28 +158,23 @@ class QrcodePlugin(Plugin):
and (not duration or time.time() < start_time + duration) and (not duration or time.time() < start_time + duration)
and (not n_codes or len(codes) < n_codes) and (not n_codes or len(codes) < n_codes)
): ):
output = camera.get_stream() img = camera.capture_frame(session)
with output.ready:
output.ready.wait()
img = self._convert_frame(output.raw_frame)
results = pyzbar.decode(img) results = pyzbar.decode(img)
if results: if results:
results = [ results = [
result result
for result in QrcodeDecodedResponse(results).output[ for result in QrcodeDecodedResultSchema().dump(
'results' results, many=True
] )
if result['data'] not in last_results if result['data'] not in last_results
or time.time() or time.time() >= last_results_time + last_results_timeout
>= last_results_time + last_results_timeout
] ]
if results: if results:
codes.extend(results) codes.extend(results)
get_bus().post(QrcodeScannedEvent(results=results)) get_bus().post(QrcodeScannedEvent(results=results))
last_results = { last_results = {result['data']: result for result in results}
result['data']: result for result in results
}
last_results_time = time.time() last_results_time = time.time()
finally: finally:
self._capturing.clear() self._capturing.clear()

View file

@ -21,7 +21,7 @@ manifest:
- python-numpy - python-numpy
- python-pillow - python-pillow
- python-qrcode - python-qrcode
- pyzbar # - pyzbar # Only available via yay for now
pip: pip:
- numpy - numpy
- qrcode - qrcode

View file

@ -19,15 +19,18 @@ from stat import (
from typing import Optional, Dict, Tuple, List, Union, Any from typing import Optional, Dict, Tuple, List, Union, Any
from paramiko import DSSKey, RSAKey, SSHClient, WarningPolicy, SFTPClient from paramiko import DSSKey, RSAKey, SSHClient, WarningPolicy, SFTPClient
from paramiko.py3compat import u
try: try:
from paramiko import GSS_AUTH_AVAILABLE from paramiko.util import u
except ImportError:
from paramiko.py3compat import u # type: ignore
try:
from paramiko import GSS_AUTH_AVAILABLE # type: ignore
except ImportError: except ImportError:
from paramiko.ssh_gss import GSS_AUTH_AVAILABLE from paramiko.ssh_gss import GSS_AUTH_AVAILABLE
from platypush import Response from platypush import Response
from platypush.message.response.ssh import SSHKeygenResponse
from platypush.plugins import Plugin, action from platypush.plugins import Plugin, action
from platypush.plugins.ssh.tunnel.forward import forward_tunnel from platypush.plugins.ssh.tunnel.forward import forward_tunnel
from platypush.plugins.ssh.tunnel.reverse import reverse_tunnel, close_tunnel from platypush.plugins.ssh.tunnel.reverse import reverse_tunnel, close_tunnel
@ -44,8 +47,8 @@ class SshPlugin(Plugin):
self, key_file: Optional[str] = None, passphrase: Optional[str] = None, **kwargs self, key_file: Optional[str] = None, passphrase: Optional[str] = None, **kwargs
): ):
""" """
:param key_file: Default key file (default: any "id_rsa", "id_dsa", "id_ecdsa", or "id_ed25519" key discoverable :param key_file: Default key file (default: any "id_rsa", "id_dsa",
in ``~/.ssh/``. "id_ecdsa", or "id_ed25519" key discoverable in ``~/.ssh/``.
:param passphrase: Key file passphrase (default: None). :param passphrase: Key file passphrase (default: None).
""" """
super().__init__(**kwargs) super().__init__(**kwargs)
@ -71,38 +74,46 @@ class SshPlugin(Plugin):
if host.find('@') >= 0: if host.find('@') >= 0:
user, host = host.split('@') user, host = host.split('@')
if host.find(':') >= 0: if host.find(':') >= 0:
host, port = host.split(':') host, p = host.split(':')
port = int(p)
if not user: if not user:
user = getpass.getuser() user = getpass.getuser()
return host, int(port), user return host, port, user
# noinspection PyShadowingBuiltins
@action @action
def keygen( def keygen(
self, self,
filename: str, filename: str,
type: str = 'rsa', type: str = 'rsa', # pylint: disable=redefined-builtin
bits: int = 4096, bits: int = 4096,
comment: Optional[str] = None, comment: Optional[str] = None,
passphrase: Optional[str] = None, passphrase: Optional[str] = None,
) -> SSHKeygenResponse: ) -> dict:
""" """
Generate an SSH keypair. Generate an SSH keypair.
:param filename: Output file name for the private key (the public key will be stored in <filename>.pub). :param filename: Output file name for the private key (the public key
will be stored in <filename>.pub).
:param type: Encryption algorithm, either "rsa" or "dsa" (default: "rsa"). :param type: Encryption algorithm, either "rsa" or "dsa" (default: "rsa").
:param bits: Key length in bits (default: 4096). :param bits: Key length in bits (default: 4096).
:param comment: Key comment (default: None). :param comment: Key comment (default: None).
:param passphrase: Key passphrase (default: None). :param passphrase: Key passphrase (default: None).
:return: :class:`platypush.message.response.ssh.SSHKeygenResponse`. :return:
.. code-block:: json
{
"fingerprint": "SHA256:...",
"key_file": "private_key_file",
"pub_key_file": "public_key_file"
}
""" """
assert type != 'dsa' or bits <= 1024, 'DSA keys support a maximum of 1024 bits' assert type != 'dsa' or bits <= 1024, 'DSA keys support a maximum of 1024 bits'
assert ( assert (
type in self.key_dispatch_table type in self.key_dispatch_table
), 'No such type: {}. Available types: {}'.format( ), f'No such type: {type}. Available types: {self.key_dispatch_table.keys()}'
type, self.key_dispatch_table.keys()
)
if filename: if filename:
filename = os.path.abspath(os.path.expanduser(filename)) filename = os.path.abspath(os.path.expanduser(filename))
@ -110,22 +121,23 @@ class SshPlugin(Plugin):
prv = self.key_dispatch_table[type].generate(bits=bits) prv = self.key_dispatch_table[type].generate(bits=bits)
prv.write_private_key_file(filename=filename, password=passphrase) prv.write_private_key_file(filename=filename, password=passphrase)
pub = self.key_dispatch_table[type](filename=filename, password=passphrase) pub = self.key_dispatch_table[type](filename=filename, password=passphrase)
pub_file = '{filename}.pub'.format(filename=filename) pub_file = f'{filename}.pub'
with open(pub_file, 'w') as f: with open(pub_file, 'w') as f:
f.write('{name} {key}'.format(name=pub.get_name(), key=pub.get_base64())) f.write(f'{pub.get_name()} {pub.get_base64()}')
if comment: if comment:
f.write(' ' + comment) f.write(' ' + comment)
hash = u(hexlify(pub.get_fingerprint())) return {
return SSHKeygenResponse( 'fingerprint': u(hexlify(pub.get_fingerprint())),
fingerprint=hash, key_file=filename, pub_key_file=pub_file 'key_file': filename,
) 'pub_key_file': pub_file,
}
def run(self, *args, **kwargs): def run(self, *args, **kwargs):
try: try:
return super().run(*args, **kwargs) return super().run(*args, **kwargs)
except Exception as e: except Exception as e:
raise AssertionError(e) raise AssertionError(e) from e
def _connect( def _connect(
self, self,
@ -144,9 +156,10 @@ class SshPlugin(Plugin):
key = (host, port, user) key = (host, port, user)
if key in self._sessions: if key in self._sessions:
self.logger.info( self.logger.info(
'[Connect] The SSH session is already active: {user}@{host}:{port}'.format( '[Connect] The SSH session is already active: %s@%s:%d',
user=user, host=host, port=port user,
) host,
port,
) )
return self._sessions[key] return self._sessions[key]
@ -178,7 +191,7 @@ class SshPlugin(Plugin):
return client return client
except Exception as e: except Exception as e:
self.logger.exception(e) self.logger.exception(e)
raise AssertionError('Connection to {} failed: {}'.format(host, str(e))) raise AssertionError(f'Connection to {host} failed: {type(e)}: {e}')
@action @action
def connect( def connect(
@ -231,9 +244,7 @@ class SshPlugin(Plugin):
key = (host, port, user) key = (host, port, user)
if key not in self._sessions: if key not in self._sessions:
self.logger.info( self.logger.info(
'[Disconnect] The SSH session is not active: {user}@{host}:{port}'.format( '[Disconnect] The SSH session is not active: %s@%s:%d', user, host, port
user=user, host=host, port=port
)
) )
session = self._sessions[key] session = self._sessions[key]
@ -269,13 +280,13 @@ class SshPlugin(Plugin):
def decode(buf: bytes) -> str: def decode(buf: bytes) -> str:
try: try:
buf = buf.decode() s_buf = buf.decode()
except (ValueError, TypeError): except (ValueError, TypeError):
buf = base64.encodebytes(buf).decode() s_buf = base64.encodebytes(buf).decode()
if buf.endswith('\n'): if s_buf.endswith('\n'):
buf = buf[:-1] s_buf = s_buf[:-1]
return buf return s_buf
try: try:
_in, _out, _err = client.exec_command(cmd, timeout=timeout, environment=env) _in, _out, _err = client.exec_command(cmd, timeout=timeout, environment=env)
@ -300,6 +311,9 @@ class SshPlugin(Plugin):
@staticmethod @staticmethod
def is_directory(sftp: SFTPClient, path: str) -> bool: def is_directory(sftp: SFTPClient, path: str) -> bool:
f = sftp.lstat(path) f = sftp.lstat(path)
if f.st_mode is None:
return False
return S_ISDIR(f.st_mode) return S_ISDIR(f.st_mode)
@classmethod @classmethod
@ -308,7 +322,7 @@ class SshPlugin(Plugin):
folders = [] folders = []
for f in sftp.listdir_attr(path): for f in sftp.listdir_attr(path):
if S_ISDIR(f.st_mode): if f.st_mode is not None and S_ISDIR(f.st_mode):
folders.append(f.filename) folders.append(f.filename)
else: else:
files.append(f.filename) files.append(f.filename)
@ -317,8 +331,7 @@ class SshPlugin(Plugin):
for folder in folders: for folder in folders:
new_path = os.path.join(path, folder) new_path = os.path.join(path, folder)
for x in cls.sftp_walk(sftp, new_path): yield from cls.sftp_walk(sftp, new_path)
yield x
def sftp_get( def sftp_get(
self, self,
@ -341,10 +354,9 @@ class SshPlugin(Plugin):
for file in files: for file in files:
self.logger.info( self.logger.info(
'Downloading file {} from {} to {}'.format( 'Downloading file %s from %s to %s', file, path, new_local_path
file, path, new_local_path
)
) )
self.sftp_get( self.sftp_get(
sftp, sftp,
os.path.join(remote_path, path, file), os.path.join(remote_path, path, file),
@ -421,16 +433,17 @@ class SshPlugin(Plugin):
try: try:
sftp.mkdir(remote_path) sftp.mkdir(remote_path)
except Exception as e: except Exception as e:
self.logger.warning(f'mkdir {remote_path}: {e}') self.logger.warning(
'mkdir %s failed: %s: %s', remote_path, type(e), e
)
assert ( assert (
recursive recursive
), '{} is a directory but recursive has been set to False'.format( ), f'{local_path} is a directory but recursive has been set to False'
local_path
)
assert self.is_directory( assert self.is_directory(
sftp, remote_path sftp, remote_path
), '{} is not a directory on the remote host'.format(remote_path) ), f'{remote_path} is not a directory on the remote host'
sftp.chdir(remote_path) sftp.chdir(remote_path)
os.chdir(local_path) os.chdir(local_path)
@ -439,12 +452,14 @@ class SshPlugin(Plugin):
try: try:
sftp.mkdir(path) sftp.mkdir(path)
except Exception as e: except Exception as e:
self.logger.warning(f'mkdir {remote_path}: {e}') self.logger.warning(
'mkdir %s failed: %s: %s', remote_path, type(e), e
)
for file in files: for file in files:
src = os.path.join(path, file) src = os.path.join(path, file)
dst = os.path.join(path, file) dst = os.path.join(path, file)
self.logger.info('Copying {} to {}'.format(src, dst)) self.logger.info('Copying %s to %s', src, dst)
sftp.put(src, dst) sftp.put(src, dst)
else: else:
if self.is_directory(sftp, remote_path): if self.is_directory(sftp, remote_path):
@ -475,7 +490,9 @@ class SshPlugin(Plugin):
client = self._connect(**kwargs) client = self._connect(**kwargs)
sftp = client.open_sftp() sftp = client.open_sftp()
def get_file_type(st_mode: int) -> str: def get_file_type(st_mode: Optional[int]) -> str:
if st_mode is None:
return 'unknown'
if S_ISDIR(st_mode): if S_ISDIR(st_mode):
return 'directory' return 'directory'
elif S_ISBLK(st_mode): elif S_ISBLK(st_mode):
@ -503,15 +520,23 @@ class SshPlugin(Plugin):
'longname': f.longname, 'longname': f.longname,
'attributes': f.attr, 'attributes': f.attr,
'type': get_file_type(f.st_mode), 'type': get_file_type(f.st_mode),
'access_time': datetime.datetime.fromtimestamp(f.st_atime), 'access_time': (
'modify_time': datetime.datetime.fromtimestamp(f.st_mtime), datetime.datetime.fromtimestamp(f.st_atime)
if f.st_atime
else None
),
'modify_time': (
datetime.datetime.fromtimestamp(f.st_mtime)
if f.st_mtime
else None
),
'uid': f.st_uid, 'uid': f.st_uid,
'gid': f.st_gid, 'gid': f.st_gid,
'size': f.st_size, 'size': f.st_size,
} }
for f in sftp.listdir_attr(path) for f in sftp.listdir_attr(path)
} }
else:
return sftp.listdir(path) return sftp.listdir(path)
finally: finally:
if not keep_alive: if not keep_alive:
@ -692,7 +717,7 @@ class SshPlugin(Plugin):
sftp = client.open_sftp() sftp = client.open_sftp()
try: try:
return sftp.getcwd() return sftp.getcwd() or '/'
finally: finally:
if not keep_alive: if not keep_alive:
host, port, user = self._get_host_port_user(**kwargs) host, port, user = self._get_host_port_user(**kwargs)
@ -704,24 +729,29 @@ class SshPlugin(Plugin):
local_port: int, local_port: int,
remote_host: str, remote_host: str,
remote_port: int, remote_port: int,
bind_addr: Optional[str] = '', bind_addr: str = '',
**kwargs, **kwargs,
): ):
""" """
Start an SSH forward tunnel, tunnelling <local_port> to <remote_host>:<remote_port>. Start an SSH forward tunnel, tunnelling ``<local_port>`` to
``<remote_host>:<remote_port>``.
:param local_port: Local port. :param local_port: Local port.
:param remote_host: Remote host. :param remote_host: Remote host.
:param remote_port: Remote port. :param remote_port: Remote port.
:param bind_addr: If set, the `local_port` will be bound to this address/subnet (default: '', or 0.0.0.0: any). :param bind_addr: If set, the `local_port` will be bound to this
:param kwargs: Arguments for :meth:`platypush.plugins.ssh.SshPlugin.connect`. address/subnet (default: '', or 0.0.0.0: any).
:param kwargs: Arguments for
:meth:`platypush.plugins.ssh.SshPlugin.connect`.
""" """
key = local_port, remote_host, remote_port key = local_port, remote_host, remote_port
if key in self._fwd_tunnels: if key in self._fwd_tunnels:
self.logger.info( self.logger.info(
'The tunnel {}:{}:{}:{} is already active'.format( 'The tunnel %s:%d:%s:%d is already active',
bind_addr, local_port, remote_host, remote_port bind_addr,
) local_port,
remote_host,
remote_port,
) )
return return
@ -753,9 +783,7 @@ class SshPlugin(Plugin):
key = (local_port, remote_host, remote_port) key = (local_port, remote_host, remote_port)
if key not in self._fwd_tunnels: if key not in self._fwd_tunnels:
self.logger.warning( self.logger.warning(
'No such forward tunnel: {}:{}:{}'.format( 'No such forward tunnel: %d:%s:%d', local_port, remote_host, remote_port
local_port, remote_host, remote_port
)
) )
return return
@ -772,7 +800,7 @@ class SshPlugin(Plugin):
server_port: int, server_port: int,
remote_host: str, remote_host: str,
remote_port: int, remote_port: int,
bind_addr: Optional[str] = '', bind_addr: str = '',
**kwargs, **kwargs,
): ):
""" """
@ -788,9 +816,11 @@ class SshPlugin(Plugin):
key = server_port, remote_host, remote_port key = server_port, remote_host, remote_port
if key in self._fwd_tunnels: if key in self._fwd_tunnels:
self.logger.info( self.logger.info(
'The tunnel {}:{}:{}:{} is already active'.format( 'The tunnel %s:%d:%s:%d is already active',
bind_addr, server_port, remote_host, remote_port bind_addr,
) server_port,
remote_host,
remote_port,
) )
return return
@ -823,9 +853,10 @@ class SshPlugin(Plugin):
key = (server_port, remote_host, remote_port) key = (server_port, remote_host, remote_port)
if key not in self._rev_tunnels: if key not in self._rev_tunnels:
self.logger.warning( self.logger.warning(
'No such reversed tunnel: {}:{}:{}'.format( 'No such reverse tunnel: %d:%s:%d',
server_port, remote_host, remote_port server_port,
) remote_host,
remote_port,
) )
return return

158
platypush/schemas/qrcode.py Normal file
View file

@ -0,0 +1,158 @@
import base64
from marshmallow import EXCLUDE, fields, pre_dump
from marshmallow.schema import Schema
class QrcodeGeneratedSchema(Schema):
"""
Schema for a QR code generation response.
"""
class Meta: # type: ignore
"""
Exclude unknown fields.
"""
unknown = EXCLUDE
text = fields.String(
required=True,
metadata={
'description': 'Text content of the QR code',
'example': 'https://platypush.tech',
},
)
data = fields.String(
metadata={
'description': 'Base64-encoded content of the QR code',
'example': 'iVBORw0KGgoAAAANSUhEUgAAAXIAAAFyAQAAAADAX2yk',
}
)
format = fields.String(
metadata={
'description': 'Format of the QR code image',
'example': 'png',
},
)
image_file = fields.String(
metadata={
'description': 'Path to the generated QR code image file',
'example': '/tmp/qr_code.png',
},
)
class QrcodeDecodedRectSchema(Schema):
"""
Schema for a single QR code decoding result rectangle.
"""
x = fields.Integer(
required=True,
metadata={
'description': 'X coordinate of the rectangle in the image',
'example': 0,
},
)
y = fields.Integer(
required=True,
metadata={
'description': 'Y coordinate of the rectangle in the image',
'example': 0,
},
)
width = fields.Integer(
required=True,
metadata={
'description': 'Width of the rectangle',
'example': 100,
},
)
height = fields.Integer(
required=True,
metadata={
'description': 'Height of the rectangle',
'example': 100,
},
)
class QrcodeDecodedResultSchema(Schema):
"""
Schema for a single QR code decoding result.
"""
data = fields.String(
required=True,
metadata={
'description': 'Decoded QRcode data, as a base64-encoded string if binary',
'example': 'https://platypush.tech',
},
)
type = fields.String(
required=True,
metadata={
'description': (
'Type of code that was decoded. Supports the types available under the '
'`pyzbar.ZBarSymbol` class: '
'https://github.com/NaturalHistoryMuseum/pyzbar/blob/master/pyzbar/wrapper.py#L43'
),
'example': 'QRCODE',
},
)
rect = fields.Nested(
QrcodeDecodedRectSchema,
required=True,
metadata={
'description': 'Rectangle in the image where the QR code was found',
},
)
@pre_dump
def pre_dump(self, data, **_):
if hasattr(data, '_asdict'):
data = data._asdict()
try:
data['data'] = data['data'].decode()
except (ValueError, TypeError):
data['data'] = base64.b64encode(data['data']).decode()
return data
class QrcodeDecodedSchema(Schema):
"""
Schema for a QR code decoding response.
"""
class Meta: # type: ignore
"""
Exclude unknown fields.
"""
unknown = EXCLUDE
results = fields.List(
fields.Nested(QrcodeDecodedResultSchema),
required=True,
metadata={
'description': 'Decoded QR code results',
},
)
image_file = fields.String(
metadata={
'description': 'Path to the image file that was decoded',
'example': '/tmp/qr_code.png',
},
)

View file

@ -97,6 +97,7 @@ mock_imports = [
"pyotp", "pyotp",
"pysmartthings", "pysmartthings",
"pyzbar", "pyzbar",
"qrcode",
"rtmidi", "rtmidi",
"samsungtvws", "samsungtvws",
"serial", "serial",