platypush/platypush/plugins/nfc/__init__.py

282 lines
8.9 KiB
Python

import base64
import json
from platypush.message.event.nfc import (
NFCTagDetectedEvent,
NFCTagRemovedEvent,
NFCDeviceConnectedEvent,
NFCDeviceDisconnectedEvent,
)
from platypush.plugins import RunnablePlugin, action
class NfcPlugin(RunnablePlugin):
"""
Plugin to detect events from NFC devices.
Run the following command to check if your device is compatible with nfcpy
and the right permissions are set::
python -m nfc
"""
def __init__(self, device='usb', **kwargs):
"""
:param device: Address or ID of the device to be opened. Examples:
* ``usb:003:009`` opens device 9 on bus 3
* ``usb:003`` opens the first available device on bus 3
* ``usb`` opens the first available USB device (default)
"""
super().__init__(**kwargs)
self.device_id = device
self._clf = None
def _get_clf(self):
import nfc
if not self._clf:
self._clf = nfc.ContactlessFrontend()
self._clf.open(self.device_id)
assert self._clf and self._clf.device, 'NFC reader not initialized'
self._bus.post(NFCDeviceConnectedEvent(reader=self._get_device_str()))
self.logger.info(
'Initialized NFC reader on device %s', self._get_device_str()
)
return self._clf
def _get_device_str(self):
assert self._clf, 'NFC reader not initialized'
return str(self._clf.device)
def close(self):
if self._clf:
self._clf.close()
self._clf = None
self._bus.post(NFCDeviceDisconnectedEvent(reader=self._get_device_str()))
@staticmethod
def _parse_records(tag):
from ndef.text import TextRecord
from ndef.uri import UriRecord
from ndef.smartposter import SmartposterRecord
from ndef.deviceinfo import DeviceInformationRecord
from ndef.wifi import WifiSimpleConfigRecord, WifiPeerToPeerRecord
from ndef.bluetooth import BluetoothLowEnergyRecord, BluetoothEasyPairingRecord
from ndef.signature import SignatureRecord
records = []
if not tag.ndef:
return records
for record in tag.ndef.records:
r = {
'record_type': record.type,
'record_name': record.name,
}
if isinstance(record, TextRecord):
try:
r = {
**r,
'type': 'json',
'value': json.loads(record.text),
}
except ValueError:
r = {
**r,
'type': 'text',
'text': record.text,
}
elif isinstance(record, UriRecord):
r = {
**r,
'type': 'uri',
'uri': record.uri,
'iri': record.iri,
}
elif isinstance(record, SmartposterRecord):
r = {
**r,
'type': 'smartposter',
**{
attr: getattr(record, attr)
for attr in [
'resource',
'titles',
'title',
'action',
'icon',
'icons',
'resource_size',
'resource_type',
]
},
}
elif isinstance(record, DeviceInformationRecord):
r = {
**r,
'type': 'device_info',
**{
attr: getattr(record, attr)
for attr in [
'vendor_name',
'model_name',
'unique_name',
'uuid_string',
'version_string',
]
},
}
elif isinstance(record, WifiSimpleConfigRecord):
r = {
**r,
'type': 'wifi_simple_config',
**{attr: record[attr] for attr in record.attribute_names()},
}
elif isinstance(record, WifiPeerToPeerRecord):
r = {
**r,
'type': 'wifi_peer_to_peer',
**{attr: record[attr] for attr in record.attribute_names()},
}
elif isinstance(record, BluetoothEasyPairingRecord):
r = {
**r,
'type': 'bluetooth_easy_pairing',
**{
attr: getattr(record, attr)
for attr in ['device_address', 'device_name', 'device_class']
},
}
elif isinstance(record, BluetoothLowEnergyRecord):
r = {
**r,
'type': 'bluetooth_low_energy',
**{
attr: getattr(record, attr)
for attr in [
'device_address',
'device_name',
'role_capabilities',
'appearance',
'flags',
'security_manager_tk_value',
'secure_connections_confirmation_value',
'secure_connections_random_value',
]
},
}
elif isinstance(record, SignatureRecord):
r = {
**r,
'type': 'signature',
**{
attr: getattr(record, attr)
for attr in [
'version',
'signature_type',
'hash_type',
'signature',
'signature_uri',
'certificate_format',
'certificate_store',
'certificate_uri',
'secure_connections_random_value',
]
},
}
else:
r = {
**r,
'type': 'binary',
'data': base64.encodebytes(record.data).decode(),
}
records.append(r)
return records
@staticmethod
def _parse_id(tag):
return ''.join([('%02X' % c) for c in tag.identifier])
def _on_connect(self):
def callback(tag):
if not tag:
return False
tag_id = self._parse_id(tag)
records = self._parse_records(tag)
self._bus.post(
NFCTagDetectedEvent(
reader=self._get_device_str(), tag_id=tag_id, records=records
)
)
return True
return callback
def _on_release(self):
def callback(tag):
tag_id = self._parse_id(tag)
self._bus.post(
NFCTagRemovedEvent(reader=self._get_device_str(), tag_id=tag_id)
)
return callback
@action
def status(self):
"""
Get information about the NFC reader status.
Example output:
.. code-block:: json
{
"display_name": "ACS ACR122U PN532v1.6 at usb:001:017",
"path": "usb:001:017",
"product_name": "ACR122U",
"vendor_name": "ACS"
}
"""
assert self._clf and self._clf.device, 'NFC reader not initialized'
return {
'display_name': str(self._clf.device),
'path': self._clf.device.path,
'product_name': self._clf.device.product_name,
'vendor_name': self._clf.device.vendor_name,
}
def main(self):
fail_wait = 5
max_fail_wait = 60
while not self.should_stop():
try:
clf = self._get_clf()
clf.connect(
rdwr={
'on-connect': self._on_connect(),
'on-release': self._on_release(),
}
)
fail_wait = 5
except Exception as e:
self.logger.warning(
'NFC error, retrying in %d seconds: %s', e, fail_wait, exc_info=True
)
self.wait_stop(fail_wait)
fail_wait = min(fail_wait * 2, max_fail_wait)
finally:
self.close()
# vim:sw=4:ts=4:et: