platypush/platypush/plugins/bluetooth/__init__.py

432 lines
16 KiB
Python

import base64
import os
import re
import select
from typing import Dict, Optional
from platypush.plugins.sensor import SensorPlugin
from platypush.plugins import action
from platypush.message.response.bluetooth import BluetoothScanResponse, \
BluetoothLookupNameResponse, BluetoothLookupServiceResponse, BluetoothResponse
class BluetoothPlugin(SensorPlugin):
"""
Bluetooth plugin
Requires:
* **pybluez** (``pip install pybluez``)
* **pyobex** (``pip install git+https://github.com/BlackLight/PyOBEX``)
"""
import bluetooth
class _DeviceDiscoverer(bluetooth.DeviceDiscoverer):
def __init__(self, name, *args, **kwargs):
super().__init__(*args, **kwargs)
self.name = name
self.device = {}
self.done = True
def pre_inquiry(self):
self.done = False
def device_discovered(self, dev_addr, dev_class, rssi, dev_name):
dev_name = dev_name.decode()
if dev_name == self.name:
self.device = {
'addr': dev_addr,
'name': dev_name,
'class': dev_class,
}
self.done = True
def inquiry_complete(self):
self.done = True
def __init__(self, device_id: int = -1, **kwargs):
"""
:param device_id: Default adapter device_id to be used (default: -1, auto)
"""
super().__init__(**kwargs)
self.device_id = device_id
self._devices = []
self._devices_by_addr = {}
self._devices_by_name = {}
self._port_and_protocol_by_addr_and_srv_uuid = {}
self._port_and_protocol_by_addr_and_srv_name = {}
self._socks = {}
def _get_device_addr(self, device):
if re.match('([0-9A-F]{2}:){5}[0-9A-F]{2}', device, re.IGNORECASE):
return device
if device in self._devices_by_name:
return self._devices_by_name[device]['addr']
return self.lookup_address(device).output['addr']
@action
def scan(self, device_id: Optional[int] = None, duration: int = 10) -> BluetoothScanResponse:
"""
Scan for nearby bluetooth devices
:param device_id: Bluetooth adapter ID to use (default configured if None)
:param duration: Scan duration in seconds
"""
from bluetooth import discover_devices
if device_id is None:
device_id = self.device_id
self.logger.debug('Discovering devices on adapter {}, duration: {} seconds'.format(
device_id, duration))
devices = discover_devices(duration=duration, lookup_names=True, lookup_class=True, device_id=device_id,
flush_cache=True)
response = BluetoothScanResponse(devices)
self._devices = response.devices
self._devices_by_addr = {dev['addr']: dev for dev in self._devices}
self._devices_by_name = {dev['name']: dev for dev in self._devices if dev.get('name')}
return response
@action
def get_measurement(self, device_id: Optional[int] = None, duration: Optional[int] = 10, *args, **kwargs) \
-> Dict[str, dict]:
"""
Wrapper for ``scan`` that returns bluetooth devices in a format usable by sensor backends.
:param device_id: Bluetooth adapter ID to use (default configured if None)
:param duration: Scan duration in seconds
:return: Device address -> info map.
"""
devices = self.scan(device_id=device_id, duration=duration).output
return {device['addr']: device for device in devices}
@action
def lookup_name(self, addr: str, timeout: int = 10) -> BluetoothLookupNameResponse:
"""
Look up the name of a nearby bluetooth device given the address
:param addr: Device address
:param timeout: Lookup timeout (default: 10 seconds)
"""
from bluetooth import lookup_name
self.logger.debug('Looking up name for device {}'.format(addr))
name = lookup_name(addr, timeout=timeout)
dev = {
'addr': addr,
'name': name,
'class': self._devices_by_addr.get(addr, {}).get('class'),
}
self._devices_by_addr[addr] = dev
if name:
self._devices_by_name[name] = dev
return BluetoothLookupNameResponse(addr=addr, name=name)
@action
def lookup_address(self, name: str, timeout: int = 10) -> BluetoothLookupNameResponse:
"""
Look up the address of a nearby bluetooth device given the name
:param name: Device name
:param timeout: Lookup timeout (default: 10 seconds)
"""
self.logger.info('Looking up address for device {}'.format(name))
discoverer = self._DeviceDiscoverer(name)
discoverer.find_devices(lookup_names=True, duration=timeout)
readfiles = [discoverer]
while True:
rfds = select.select(readfiles, [], [])[0]
if discoverer in rfds:
discoverer.process_event()
if discoverer.done:
break
dev = discoverer.device
if not dev:
raise RuntimeError('No such device: {}'.format(name))
addr = dev.get('addr')
self._devices_by_addr[addr] = dev
self._devices_by_name[name] = dev
return BluetoothLookupNameResponse(addr=addr, name=name)
@action
def find_service(self, name: str = None, addr: str = None, uuid: str = None) -> BluetoothLookupServiceResponse:
"""
Look up for a service published by a nearby bluetooth device. If all the parameters are null then all the
published services on the nearby devices will be returned. See
`:class:platypush.message.response.bluetoothBluetoothLookupServiceResponse` for response structure reference.
:param name: Service name
:param addr: Service/device address
:param uuid: Service UUID
"""
import bluetooth
from bluetooth import find_service
services = find_service(name=name, address=addr, uuid=uuid)
self._port_and_protocol_by_addr_and_srv_uuid.update({
(srv['host'], srv['service-id']): (srv['port'], getattr(bluetooth, srv['protocol']))
for srv in services if srv.get('service-id')
})
self._port_and_protocol_by_addr_and_srv_name.update({
(srv['host'], srv['name']): (srv['port'], getattr(bluetooth, srv['protocol']))
for srv in services if srv.get('name')
})
return BluetoothLookupServiceResponse(services)
def _get_sock(self, protocol=None, device: str = None, port: int = None, service_uuid: str = None,
service_name: str = None, connect_if_closed=False):
sock = None
addr = self._get_device_addr(device)
if not (addr and port and protocol):
addr, port, protocol = self._get_addr_port_protocol(protocol=protocol, device=device, port=port,
service_uuid=service_uuid, service_name=service_name)
if (addr, port) in self._socks:
sock = self._socks[(addr, port)]
elif connect_if_closed:
self.connect(protocol=protocol, device=device, port=port, service_uuid=service_uuid,
service_name=service_name)
sock = self._socks[(addr, port)]
return sock
def _get_addr_port_protocol(self, protocol=None, device: str = None, port: int = None, service_uuid: str = None,
service_name: str = None) -> tuple:
import bluetooth
addr = self._get_device_addr(device) if device else None
if service_uuid or service_name:
if addr:
if service_uuid:
(port, protocol) = self._port_and_protocol_by_addr_and_srv_uuid[(addr, service_uuid)] \
if (addr, service_uuid) in self._port_and_protocol_by_addr_and_srv_uuid else \
(None, None)
else:
(port, protocol) = self._port_and_protocol_by_addr_and_srv_name[(addr, service_name)] \
if (addr, service_name) in self._port_and_protocol_by_addr_and_srv_name else \
(None, None)
if not (addr and port):
self.logger.info('Discovering devices, service_name={name}, uuid={uuid}, address={addr}'.format(
name=service_name, uuid=service_uuid, addr=addr))
services = [
srv for srv in self.find_service().services
if (service_name is None or srv.get('name') == service_name) and
(addr is None or srv.get('host') == addr) and
(service_uuid is None or srv.get('service-id') == service_uuid)
]
if not services:
raise RuntimeError('No such service: name={name} uuid={uuid} address={addr}'.format(
name=service_name, uuid=service_uuid, addr=addr))
service = services[0]
addr = service['host']
port = service['port']
protocol = getattr(bluetooth, service['protocol'])
elif protocol:
if isinstance(protocol, str):
protocol = getattr(bluetooth, protocol)
else:
raise RuntimeError('No service name/UUID nor bluetooth protocol (RFCOMM/L2CAP) specified')
if not (addr and port):
raise RuntimeError('No valid device name/address, port, service name or UUID specified')
return addr, port, protocol
@action
def connect(self, protocol=None, device: str = None, port: int = None, service_uuid: str = None,
service_name: str = None):
"""
Connect to a bluetooth device.
You can query the advertised services through ``find_service``.
:param protocol: Supported values: either 'RFCOMM'/'L2CAP' (str) or bluetooth.RFCOMM/bluetooth.L2CAP
int constants (int)
:param device: Device address or name
:param port: Port number
:param service_uuid: Service UUID
:param service_name: Service name
"""
from bluetooth import BluetoothSocket
addr, port, protocol = self._get_addr_port_protocol(protocol=protocol, device=device, port=port,
service_uuid=service_uuid, service_name=service_name)
sock = self._get_sock(protocol=protocol, device=addr, port=port)
if sock:
self.close(device=addr, port=port)
sock = BluetoothSocket(protocol)
self.logger.info('Opening connection to device {} on port {}'.format(addr, port))
sock.connect((addr, port))
self.logger.info('Connected to device {} on port {}'.format(addr, port))
self._socks[(addr, port)] = sock
@action
def close(self, device: str = None, port: int = None, service_uuid: str = None, service_name: str = None):
"""
Close an active bluetooth connection
:param device: Device address or name
:param port: Port number
:param service_uuid: Service UUID
:param service_name: Service name
"""
sock = self._get_sock(device=device, port=port, service_uuid=service_uuid, service_name=service_name)
if not sock:
self.logger.debug('Close on device {}({}) that is not connected'.format(device, port))
return
try:
sock.close()
except Exception as e:
self.logger.warning('Exception while closing previous connection to {}({}): {}'.format(
device, port, str(e)))
@action
def send(self, data, device: str = None, port: int = None, service_uuid: str = None, service_name: str = None,
binary: bool = False):
"""
Send data to an active bluetooth connection
:param data: Data to be sent
:param device: Device address or name
:param service_uuid: Service UUID
:param service_name: Service name
:param port: Port number
:param binary: Set to true if msg is a base64-encoded binary string
"""
from bluetooth import BluetoothError
sock = self._get_sock(device=device, port=port, service_uuid=service_uuid, service_name=service_name,
connect_if_closed=True)
if binary:
data = base64.decodebytes(data.encode() if isinstance(data, str) else data)
try:
sock.send(data)
except BluetoothError as e:
self.close(device=device, port=port, service_uuid=service_uuid, service_name=service_name)
raise e
@action
def recv(self, device: str, port: int, service_uuid: str = None, service_name: str = None, size: int = 1024,
binary: bool = False) -> BluetoothResponse:
"""
Send data to an active bluetooth connection
:param device: Device address or name
:param port: Port number
:param service_uuid: Service UUID
:param service_name: Service name
:param size: Maximum number of bytes to be read
:param binary: Set to true to return a base64-encoded binary string
"""
from bluetooth import BluetoothError
sock = self._get_sock(device=device, port=port, service_uuid=service_uuid, service_name=service_name,
connect_if_closed=True)
if not sock:
self.connect(device=device, port=port, service_uuid=service_uuid, service_name=service_name)
sock = self._get_sock(device=device, port=port, service_uuid=service_uuid, service_name=service_name)
try:
data = sock.recv(size)
except BluetoothError as e:
self.close(device=device, port=port, service_uuid=service_uuid, service_name=service_name)
raise e
if binary:
data = base64.encodebytes(data)
return BluetoothResponse(output=data.decode())
@action
def set_l2cap_mtu(self, mtu: int, device: str = None, port: int = None, service_name: str = None,
service_uuid: str = None):
"""
Set the L2CAP MTU (Maximum Transmission Unit) value for a connected bluetooth device.
Both the devices usually use the same MTU value over a connection.
:param device: Device address or name
:param port: Port number
:param service_uuid: Service UUID
:param service_name: Service name
:param mtu: New MTU value
"""
from bluetooth import BluetoothError, set_l2cap_mtu, L2CAP
sock = self._get_sock(protocol=L2CAP, device=device, port=port, service_uuid=service_uuid,
service_name=service_name, connect_if_closed=True)
if not sock:
raise RuntimeError('set_l2cap_mtu: device not connected')
try:
set_l2cap_mtu(sock, mtu)
except BluetoothError as e:
self.close(device=device, port=port, service_name=service_name, service_uuid=service_uuid)
raise e
@action
def send_file(self, filename: str, device: str, port: int = None, data=None, service_name='OBEX Object Push',
binary: bool = False):
"""
Send a local file to a device that exposes an OBEX Object Push service
:param filename: Path of the file to be sent
:param data: Alternatively to a file on disk you can send raw (string or binary) content
:param device: Device address or name
:param port: Port number
:param service_name: Service name
:param binary: Set to true if data is a base64-encoded binary string
"""
from PyOBEX.client import Client
if not data:
filename = os.path.abspath(os.path.expanduser(filename))
with open(filename, 'r') as f:
data = f.read()
filename = os.path.basename(filename)
else:
if binary:
data = base64.decodebytes(data.encode() if isinstance(data, str) else data)
addr, port, protocol = self._get_addr_port_protocol(device=device, port=port,
service_name=service_name)
client = Client(addr, port)
self.logger.info('Connecting to device {}'.format(addr))
client.connect()
self.logger.info('Sending file {} to device {}'.format(filename, addr))
client.put(filename, data)
self.logger.info('File {} sent to device {}'.format(filename, addr))
client.disconnect()
# vim:sw=4:ts=4:et: