2019-07-09 01:44:31 +02:00
|
|
|
import base64
|
|
|
|
import json
|
|
|
|
|
|
|
|
from platypush.backend import Backend
|
|
|
|
from platypush.message.event.nfc import NFCTagDetectedEvent, NFCTagRemovedEvent, NFCDeviceConnectedEvent, \
|
|
|
|
NFCDeviceDisconnectedEvent
|
|
|
|
|
|
|
|
|
|
|
|
class NfcBackend(Backend):
|
|
|
|
"""
|
|
|
|
Backend to detect NFC card events from a compatible reader.
|
|
|
|
|
|
|
|
Triggers:
|
|
|
|
|
|
|
|
* :class:`platypush.message.event.nfc.NFCDeviceConnectedEvent` when an NFC reader/writer is connected
|
|
|
|
* :class:`platypush.message.event.nfc.NFCDeviceDisconnectedEvent` when an NFC reader/writer is disconnected
|
|
|
|
* :class:`platypush.message.event.nfc.NFCTagDetectedEvent` when an NFC tag is detected
|
|
|
|
* :class:`platypush.message.event.nfc.NFCTagRemovedEvent` when an NFC tag is removed
|
|
|
|
|
|
|
|
Requires:
|
|
|
|
|
2019-12-01 23:35:05 +01:00
|
|
|
* **nfcpy** >= 1.0 (``pip install 'nfcpy>=1.0'``)
|
|
|
|
* **ndef** (``pip install ndef``)
|
2019-07-13 22:36:45 +02:00
|
|
|
|
|
|
|
Run the following to check if your device is compatible with nfcpy and the right permissions are set::
|
|
|
|
|
|
|
|
python -m nfc
|
|
|
|
|
2019-07-09 01:44:31 +02:00
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, device='usb', *args, **kwargs):
|
|
|
|
"""
|
|
|
|
:param device: Address or ID of the device to be opened. Examples:
|
|
|
|
|
|
|
|
* `'usb:003:009'` opens device 9 on bus 3
|
|
|
|
* `'usb:003'` opens the first available device on bus 3
|
|
|
|
* `'usb'` opens the first available USB device (default)
|
|
|
|
"""
|
|
|
|
|
|
|
|
super().__init__(*args, **kwargs)
|
|
|
|
|
|
|
|
self.device_id = device
|
|
|
|
self._clf = None
|
|
|
|
|
|
|
|
def _get_clf(self):
|
2019-12-01 23:35:05 +01:00
|
|
|
import nfc
|
|
|
|
|
2019-07-09 01:44:31 +02:00
|
|
|
if not self._clf:
|
|
|
|
self._clf = nfc.ContactlessFrontend()
|
|
|
|
self._clf.open(self.device_id)
|
|
|
|
self.bus.post(NFCDeviceConnectedEvent(reader=self._get_device_str()))
|
|
|
|
self.logger.info('Initialized NFC reader backend on device {}'.format(self._get_device_str()))
|
|
|
|
|
|
|
|
return self._clf
|
|
|
|
|
|
|
|
def _get_device_str(self):
|
|
|
|
return str(self._clf.device)
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
if self._clf:
|
|
|
|
self._clf.close()
|
|
|
|
self._clf = None
|
|
|
|
self.bus.post(NFCDeviceDisconnectedEvent(reader=self._get_device_str()))
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _parse_records(tag):
|
2019-12-01 23:35:05 +01:00
|
|
|
import ndef
|
2019-07-09 01:44:31 +02:00
|
|
|
records = []
|
|
|
|
|
2019-12-08 21:25:39 +01:00
|
|
|
if not tag.ndef:
|
|
|
|
return records
|
|
|
|
|
2019-07-09 01:44:31 +02:00
|
|
|
for record in tag.ndef.records:
|
|
|
|
r = {
|
|
|
|
'record_type': record.type,
|
|
|
|
'record_name': record.name,
|
|
|
|
}
|
|
|
|
|
|
|
|
if isinstance(record, ndef.TextRecord):
|
|
|
|
try:
|
|
|
|
r = {
|
|
|
|
**r,
|
|
|
|
'type': 'json',
|
|
|
|
'value': json.loads(record.text),
|
|
|
|
}
|
|
|
|
except ValueError:
|
|
|
|
r = {
|
|
|
|
**r,
|
|
|
|
'type': 'text',
|
|
|
|
'text': record.text,
|
|
|
|
}
|
|
|
|
elif isinstance(record, ndef.UriRecord):
|
|
|
|
r = {
|
|
|
|
**r,
|
|
|
|
'type': 'uri',
|
|
|
|
'uri': record.uri,
|
|
|
|
'iri': record.iri,
|
|
|
|
}
|
|
|
|
elif isinstance(record, ndef.SmartposterRecord):
|
|
|
|
r = {
|
|
|
|
**r,
|
|
|
|
'type': 'smartposter',
|
|
|
|
**{attr: getattr(record, attr) for attr in ['resource', 'titles', 'title', 'action', 'icon',
|
|
|
|
'icons', 'resource_size', 'resource_type']},
|
|
|
|
}
|
|
|
|
elif isinstance(record, ndef.DeviceInformationRecord):
|
|
|
|
r = {
|
|
|
|
**r,
|
|
|
|
'type': 'device_info',
|
|
|
|
**{attr: getattr(record, attr) for attr in ['vendor_name', 'model_name', 'unique_name',
|
|
|
|
'uuid_string', 'version_string']},
|
|
|
|
}
|
|
|
|
elif isinstance(record, ndef.WifiSimpleConfigRecord):
|
|
|
|
r = {
|
|
|
|
**r,
|
|
|
|
'type': 'wifi_simple_config',
|
|
|
|
**{attr: record[attr] for attr in record.attribute_names()}
|
|
|
|
}
|
|
|
|
elif isinstance(record, ndef.WifiPeerToPeerRecord):
|
|
|
|
r = {
|
|
|
|
**r,
|
|
|
|
'type': 'wifi_peer_to_peer',
|
|
|
|
**{attr: record[attr] for attr in record.attribute_names()}
|
|
|
|
}
|
|
|
|
elif isinstance(record, ndef.BluetoothEasyPairingRecord):
|
|
|
|
r = {
|
|
|
|
**r,
|
|
|
|
'type': 'bluetooth_easy_pairing',
|
|
|
|
**{attr: getattr(record, attr) for attr in ['device_address', 'device_name', 'device_class']},
|
|
|
|
}
|
|
|
|
elif isinstance(record, ndef.BluetoothLowEnergyRecord):
|
|
|
|
r = {
|
|
|
|
**r,
|
|
|
|
'type': 'bluetooth_low_energy',
|
|
|
|
**{attr: getattr(record, attr) for attr in ['device_address', 'device_name', 'role_capabilities',
|
|
|
|
'appearance', 'flags', 'security_manager_tk_value',
|
|
|
|
'secure_connections_confirmation_value',
|
|
|
|
'secure_connections_random_value']},
|
|
|
|
}
|
|
|
|
elif isinstance(record, ndef.SignatureRecord):
|
|
|
|
r = {
|
|
|
|
**r,
|
|
|
|
'type': 'signature',
|
|
|
|
**{attr: getattr(record, attr) for attr in ['version', 'signature_type', 'hash_type', 'signature',
|
|
|
|
'signature_uri', 'certificate_format',
|
|
|
|
'certificate_store', 'certificate_uri',
|
|
|
|
'secure_connections_random_value']},
|
|
|
|
}
|
|
|
|
else:
|
|
|
|
r = {
|
|
|
|
**r,
|
|
|
|
'type': 'binary',
|
|
|
|
'data': base64.encodebytes(record.data).decode(),
|
|
|
|
}
|
|
|
|
|
|
|
|
records.append(r)
|
|
|
|
|
|
|
|
return records
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _parse_id(tag):
|
|
|
|
return ''.join([('%02X' % c) for c in tag.identifier])
|
|
|
|
|
|
|
|
def _on_connect(self):
|
|
|
|
def callback(tag):
|
|
|
|
if not tag:
|
|
|
|
return False
|
|
|
|
|
|
|
|
tag_id = self._parse_id(tag)
|
|
|
|
records = self._parse_records(tag)
|
|
|
|
self.bus.post(NFCTagDetectedEvent(reader=self._get_device_str(), tag_id=tag_id, records=records))
|
|
|
|
return True
|
|
|
|
|
|
|
|
return callback
|
|
|
|
|
|
|
|
def _on_release(self):
|
|
|
|
def callback(tag):
|
|
|
|
tag_id = self._parse_id(tag)
|
|
|
|
self.bus.post(NFCTagRemovedEvent(reader=self._get_device_str(), tag_id=tag_id))
|
|
|
|
|
|
|
|
return callback
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
super().run()
|
|
|
|
|
|
|
|
while not self.should_stop():
|
|
|
|
try:
|
|
|
|
clf = self._get_clf()
|
|
|
|
clf.connect(rdwr={
|
|
|
|
'on-connect': self._on_connect(),
|
|
|
|
'on-release': self._on_release(),
|
|
|
|
})
|
|
|
|
finally:
|
|
|
|
self.close()
|
|
|
|
|
|
|
|
|
|
|
|
# vim:sw=4:ts=4:et:
|