forked from platypush/platypush
432 lines
16 KiB
432 lines
16 KiB
import base64
import os
import re
import select
from typing import Dict, Optional
from platypush.plugins.sensor import SensorPlugin
from platypush.plugins import action
from platypush.message.response.bluetooth import BluetoothScanResponse, \
BluetoothLookupNameResponse, BluetoothLookupServiceResponse, BluetoothResponse
class BluetoothPlugin(SensorPlugin):
Bluetooth plugin
* **pybluez** (``pip install pybluez``)
* **pyobex** (``pip install git+``)
import bluetooth
class _DeviceDiscoverer(bluetooth.DeviceDiscoverer):
def __init__(self, name, *args, **kwargs):
super().__init__(*args, **kwargs)
| = name
self.device = {}
self.done = True
def pre_inquiry(self):
self.done = False
def device_discovered(self, dev_addr, dev_class, rssi, dev_name):
dev_name = dev_name.decode()
if dev_name ==
self.device = {
'addr': dev_addr,
'name': dev_name,
'class': dev_class,
self.done = True
def inquiry_complete(self):
self.done = True
def __init__(self, device_id: int = -1, **kwargs):
:param device_id: Default adapter device_id to be used (default: -1, auto)
self.device_id = device_id
self._devices = []
self._devices_by_addr = {}
self._devices_by_name = {}
self._port_and_protocol_by_addr_and_srv_uuid = {}
self._port_and_protocol_by_addr_and_srv_name = {}
self._socks = {}
def _get_device_addr(self, device):
if re.match('([0-9A-F]{2}:){5}[0-9A-F]{2}', device, re.IGNORECASE):
return device
if device in self._devices_by_name:
return self._devices_by_name[device]['addr']
return self.lookup_address(device).output['addr']
def scan(self, device_id: Optional[int] = None, duration: int = 10) -> BluetoothScanResponse:
Scan for nearby bluetooth devices
:param device_id: Bluetooth adapter ID to use (default configured if None)
:param duration: Scan duration in seconds
from bluetooth import discover_devices
if device_id is None:
device_id = self.device_id
self.logger.debug('Discovering devices on adapter {}, duration: {} seconds'.format(
device_id, duration))
devices = discover_devices(duration=duration, lookup_names=True, lookup_class=True, device_id=device_id,
response = BluetoothScanResponse(devices)
self._devices = response.devices
self._devices_by_addr = {dev['addr']: dev for dev in self._devices}
self._devices_by_name = {dev['name']: dev for dev in self._devices if dev.get('name')}
return response
def get_measurement(self, device_id: Optional[int] = None, duration: Optional[int] = 10, *args, **kwargs) \
-> Dict[str, dict]:
Wrapper for ``scan`` that returns bluetooth devices in a format usable by sensor backends.
:param device_id: Bluetooth adapter ID to use (default configured if None)
:param duration: Scan duration in seconds
:return: Device address -> info map.
devices = self.scan(device_id=device_id, duration=duration).output
return {device['addr']: device for device in devices}
def lookup_name(self, addr: str, timeout: int = 10) -> BluetoothLookupNameResponse:
Look up the name of a nearby bluetooth device given the address
:param addr: Device address
:param timeout: Lookup timeout (default: 10 seconds)
from bluetooth import lookup_name
|'Looking up name for device {}'.format(addr))
name = lookup_name(addr, timeout=timeout)
dev = {
'addr': addr,
'name': name,
'class': self._devices_by_addr.get(addr, {}).get('class'),
self._devices_by_addr[addr] = dev
if name:
self._devices_by_name[name] = dev
return BluetoothLookupNameResponse(addr=addr, name=name)
def lookup_address(self, name: str, timeout: int = 10) -> BluetoothLookupNameResponse:
Look up the address of a nearby bluetooth device given the name
:param name: Device name
:param timeout: Lookup timeout (default: 10 seconds)
|'Looking up address for device {}'.format(name))
discoverer = self._DeviceDiscoverer(name)
discoverer.find_devices(lookup_names=True, duration=timeout)
readfiles = [discoverer]
while True:
rfds =, [], [])[0]
if discoverer in rfds:
if discoverer.done:
dev = discoverer.device
if not dev:
raise RuntimeError('No such device: {}'.format(name))
addr = dev.get('addr')
self._devices_by_addr[addr] = dev
self._devices_by_name[name] = dev
return BluetoothLookupNameResponse(addr=addr, name=name)
def find_service(self, name: str = None, addr: str = None, uuid: str = None) -> BluetoothLookupServiceResponse:
Look up for a service published by a nearby bluetooth device. If all the parameters are null then all the
published services on the nearby devices will be returned. See
`:class:platypush.message.response.bluetoothBluetoothLookupServiceResponse` for response structure reference.
:param name: Service name
:param addr: Service/device address
:param uuid: Service UUID
import bluetooth
from bluetooth import find_service
services = find_service(name=name, address=addr, uuid=uuid)
(srv['host'], srv['service-id']): (srv['port'], getattr(bluetooth, srv['protocol']))
for srv in services if srv.get('service-id')
(srv['host'], srv['name']): (srv['port'], getattr(bluetooth, srv['protocol']))
for srv in services if srv.get('name')
return BluetoothLookupServiceResponse(services)
def _get_sock(self, protocol=None, device: str = None, port: int = None, service_uuid: str = None,
service_name: str = None, connect_if_closed=False):
sock = None
addr = self._get_device_addr(device)
if not (addr and port and protocol):
addr, port, protocol = self._get_addr_port_protocol(protocol=protocol, device=device, port=port,
service_uuid=service_uuid, service_name=service_name)
if (addr, port) in self._socks:
sock = self._socks[(addr, port)]
elif connect_if_closed:
self.connect(protocol=protocol, device=device, port=port, service_uuid=service_uuid,
sock = self._socks[(addr, port)]
return sock
def _get_addr_port_protocol(self, protocol=None, device: str = None, port: int = None, service_uuid: str = None,
service_name: str = None) -> tuple:
import bluetooth
addr = self._get_device_addr(device) if device else None
if service_uuid or service_name:
if addr:
if service_uuid:
(port, protocol) = self._port_and_protocol_by_addr_and_srv_uuid[(addr, service_uuid)] \
if (addr, service_uuid) in self._port_and_protocol_by_addr_and_srv_uuid else \
(None, None)
(port, protocol) = self._port_and_protocol_by_addr_and_srv_name[(addr, service_name)] \
if (addr, service_name) in self._port_and_protocol_by_addr_and_srv_name else \
(None, None)
if not (addr and port):
|'Discovering devices, service_name={name}, uuid={uuid}, address={addr}'.format(
name=service_name, uuid=service_uuid, addr=addr))
services = [
srv for srv in self.find_service().services
if (service_name is None or srv.get('name') == service_name) and
(addr is None or srv.get('host') == addr) and
(service_uuid is None or srv.get('service-id') == service_uuid)
if not services:
raise RuntimeError('No such service: name={name} uuid={uuid} address={addr}'.format(
name=service_name, uuid=service_uuid, addr=addr))
service = services[0]
addr = service['host']
port = service['port']
protocol = getattr(bluetooth, service['protocol'])
elif protocol:
if isinstance(protocol, str):
protocol = getattr(bluetooth, protocol)
raise RuntimeError('No service name/UUID nor bluetooth protocol (RFCOMM/L2CAP) specified')
if not (addr and port):
raise RuntimeError('No valid device name/address, port, service name or UUID specified')
return addr, port, protocol
def connect(self, protocol=None, device: str = None, port: int = None, service_uuid: str = None,
service_name: str = None):
Connect to a bluetooth device.
You can query the advertised services through ``find_service``.
:param protocol: Supported values: either 'RFCOMM'/'L2CAP' (str) or bluetooth.RFCOMM/bluetooth.L2CAP
int constants (int)
:param device: Device address or name
:param port: Port number
:param service_uuid: Service UUID
:param service_name: Service name
from bluetooth import BluetoothSocket
addr, port, protocol = self._get_addr_port_protocol(protocol=protocol, device=device, port=port,
service_uuid=service_uuid, service_name=service_name)
sock = self._get_sock(protocol=protocol, device=addr, port=port)
if sock:
self.close(device=addr, port=port)
sock = BluetoothSocket(protocol)
|'Opening connection to device {} on port {}'.format(addr, port))
sock.connect((addr, port))
|'Connected to device {} on port {}'.format(addr, port))
self._socks[(addr, port)] = sock
def close(self, device: str = None, port: int = None, service_uuid: str = None, service_name: str = None):
Close an active bluetooth connection
:param device: Device address or name
:param port: Port number
:param service_uuid: Service UUID
:param service_name: Service name
sock = self._get_sock(device=device, port=port, service_uuid=service_uuid, service_name=service_name)
if not sock:
self.logger.debug('Close on device {}({}) that is not connected'.format(device, port))
except Exception as e:
self.logger.warning('Exception while closing previous connection to {}({}): {}'.format(
device, port, str(e)))
def send(self, data, device: str = None, port: int = None, service_uuid: str = None, service_name: str = None,
binary: bool = False):
Send data to an active bluetooth connection
:param data: Data to be sent
:param device: Device address or name
:param service_uuid: Service UUID
:param service_name: Service name
:param port: Port number
:param binary: Set to true if msg is a base64-encoded binary string
from bluetooth import BluetoothError
sock = self._get_sock(device=device, port=port, service_uuid=service_uuid, service_name=service_name,
if binary:
data = base64.decodebytes(data.encode() if isinstance(data, str) else data)
except BluetoothError as e:
self.close(device=device, port=port, service_uuid=service_uuid, service_name=service_name)
raise e
def recv(self, device: str, port: int, service_uuid: str = None, service_name: str = None, size: int = 1024,
binary: bool = False) -> BluetoothResponse:
Send data to an active bluetooth connection
:param device: Device address or name
:param port: Port number
:param service_uuid: Service UUID
:param service_name: Service name
:param size: Maximum number of bytes to be read
:param binary: Set to true to return a base64-encoded binary string
from bluetooth import BluetoothError
sock = self._get_sock(device=device, port=port, service_uuid=service_uuid, service_name=service_name,
if not sock:
self.connect(device=device, port=port, service_uuid=service_uuid, service_name=service_name)
sock = self._get_sock(device=device, port=port, service_uuid=service_uuid, service_name=service_name)
data = sock.recv(size)
except BluetoothError as e:
self.close(device=device, port=port, service_uuid=service_uuid, service_name=service_name)
raise e
if binary:
data = base64.encodebytes(data)
return BluetoothResponse(output=data.decode())
def set_l2cap_mtu(self, mtu: int, device: str = None, port: int = None, service_name: str = None,
service_uuid: str = None):
Set the L2CAP MTU (Maximum Transmission Unit) value for a connected bluetooth device.
Both the devices usually use the same MTU value over a connection.
:param device: Device address or name
:param port: Port number
:param service_uuid: Service UUID
:param service_name: Service name
:param mtu: New MTU value
from bluetooth import BluetoothError, set_l2cap_mtu, L2CAP
sock = self._get_sock(protocol=L2CAP, device=device, port=port, service_uuid=service_uuid,
service_name=service_name, connect_if_closed=True)
if not sock:
raise RuntimeError('set_l2cap_mtu: device not connected')
set_l2cap_mtu(sock, mtu)
except BluetoothError as e:
self.close(device=device, port=port, service_name=service_name, service_uuid=service_uuid)
raise e
def send_file(self, filename: str, device: str, port: int = None, data=None, service_name='OBEX Object Push',
binary: bool = False):
Send a local file to a device that exposes an OBEX Object Push service
:param filename: Path of the file to be sent
:param data: Alternatively to a file on disk you can send raw (string or binary) content
:param device: Device address or name
:param port: Port number
:param service_name: Service name
:param binary: Set to true if data is a base64-encoded binary string
from PyOBEX.client import Client
if not data:
filename = os.path.abspath(os.path.expanduser(filename))
with open(filename, 'r') as f:
data =
filename = os.path.basename(filename)
if binary:
data = base64.decodebytes(data.encode() if isinstance(data, str) else data)
addr, port, protocol = self._get_addr_port_protocol(device=device, port=port,
client = Client(addr, port)
|'Connecting to device {}'.format(addr))
|'Sending file {} to device {}'.format(filename, addr))
client.put(filename, data)
|'File {} sent to device {}'.format(filename, addr))
# vim:sw=4:ts=4:et: