Removed legacy bluetooth backends.

No replacements have been made for the OBEX backends (push and file
services). PyOBEX is too broken and unmaintained, and there are too many
poorly documented steps required to get an unprivileged user to run an
SDP service.
This commit is contained in:
Fabio Manganiello 2023-03-24 16:41:30 +01:00
parent 3c355352c5
commit 567e9d4e21
Signed by: blacklight
GPG key ID: D90FBA7F76362774
5 changed files with 0 additions and 254 deletions

View file

@ -1,99 +0,0 @@
import os
import time
# noinspection PyPackageRequirements
from PyOBEX import headers, requests, responses
# noinspection PyPackageRequirements
from PyOBEX.server import Server
from platypush.backend import Backend
from platypush.message.event.bluetooth import BluetoothDeviceConnectedEvent, BluetoothFileReceivedEvent, \
BluetoothDeviceDisconnectedEvent, BluetoothFilePutRequestEvent
class BluetoothBackend(Backend, Server):
_sleep_on_error = 10.0
def __init__(self, address: str = '', port: int = None, directory: str = None, whitelisted_addresses=None,
**kwargs):
Backend.__init__(self, **kwargs)
Server.__init__(self, address=address)
self.port = port
self.directory = os.path.join(os.path.expanduser(directory))
self.whitelisted_addresses = whitelisted_addresses or []
self._sock = None
def run(self):
self.logger.info('Starting bluetooth service [address={}] [port={}]'.format(
self.address, self.port))
while not self.should_stop():
try:
# noinspection PyArgumentList
self._sock = self.start_service(self.port)
self.serve(self._sock)
except Exception as e:
self.logger.error('Error on bluetooth connection [address={}] [port={}]: {}'.format(
self.address, self.port, str(e)))
time.sleep(self._sleep_on_error)
finally:
self.stop()
def stop(self):
if self._sock:
self.stop_service(self._sock)
self._sock = None
def put(self, socket, request):
name = ""
body = ""
while True:
for header in request.header_data:
if isinstance(header, headers.Name):
name = header.decode()
self.logger.info("Receiving {}".format(name))
elif isinstance(header, headers.Length):
length = header.decode()
self.logger.info("Content length: {} bytes".format(length))
elif isinstance(header, headers.Body):
body += header.decode()
elif isinstance(header, headers.End_Of_Body):
body += header.decode()
if request.is_final():
break
# Ask for more data.
Server.send_response(self, socket, responses.Continue())
# Get the next part of the data.
request = self.request_handler.decode(socket)
Server.send_response(self, socket, responses.Success())
name = os.path.basename(name.strip("\x00"))
path = os.path.join(self.directory, name)
self.logger.info("Writing file {}" .format(path))
open(path, "wb").write(body.encode())
self.bus.post(BluetoothFileReceivedEvent(path=path))
def process_request(self, connection, request, *address):
if isinstance(request, requests.Connect):
self.connect(connection, request)
self.bus.post(BluetoothDeviceConnectedEvent(address=address[0], port=address[1]))
elif isinstance(request, requests.Disconnect):
self.disconnect(connection, request)
self.bus.post(BluetoothDeviceDisconnectedEvent(address=address[0], port=address[1]))
elif isinstance(request, requests.Put):
self.bus.post(BluetoothFilePutRequestEvent(address=address[0], port=address[1]))
self.put(connection, request)
else:
self._reject(connection)
self.bus.post(BluetoothFilePutRequestEvent(address=address[0], port=address[1]))
def accept_connection(self, address, port):
return address in self.whitelisted_addresses
# vim:sw=4:ts=4:et:

View file

@ -1,92 +0,0 @@
import os
import stat
# noinspection PyPackageRequirements
from PyOBEX import requests, responses, headers
# noinspection PyPackageRequirements
from PyOBEX.server import BrowserServer
from platypush.backend.bluetooth import BluetoothBackend
from platypush.message.event.bluetooth import BluetoothFileGetRequestEvent
class BluetoothFileserverBackend(BluetoothBackend, BrowserServer):
"""
Bluetooth OBEX file server.
Enable it to allow bluetooth devices to browse files on this machine.
If you run platypush as a non-root user (and you should) then you to change the group owner of the
service discovery protocol file (/var/run/sdp) and add your user to that group. See
`here <https://stackoverflow.com/questions/34599703/rfcomm-bluetooth-permission-denied-error-raspberry-pi>`_
for details.
Requires:
* **pybluez** (``pip install pybluez``)
* **pyobex** (``pip install git+https://github.com/BlackLight/PyOBEX``)
"""
def __init__(self, port: int, address: str = '', directory: str = os.path.expanduser('~'),
whitelisted_addresses: list = None, **kwargs):
"""
:param port: Bluetooth listen port
:param address: Bluetooth address to bind the server to (default: any)
:param directory: Directory to share (default: HOME directory)
:param whitelisted_addresses: If set then only accept connections from the listed device addresses
"""
BluetoothBackend.__init__(self, address=address, port=port, directory=directory,
whitelisted_addresses=whitelisted_addresses, **kwargs)
if not os.path.isdir(self.directory):
raise FileNotFoundError(self.directory)
def process_request(self, socket, request, *address):
if isinstance(request, requests.Get):
self.bus.post(BluetoothFileGetRequestEvent(address=address[0], port=address[1]))
self.get(socket, request)
else:
super().process_request(socket, request, *address)
def get(self, socket, request):
name = ""
req_type = ""
for header in request.header_data:
if isinstance(header, headers.Name):
name = header.decode().strip("\x00")
self.logger.info("Receiving request for {}".format(name))
elif isinstance(header, headers.Type):
req_type = header.decode().strip("\x00")
self.logger.info("Request type: {}".format(req_type))
path = os.path.abspath(os.path.join(self.directory, name))
if os.path.isdir(path) or req_type == "x-obex/folder-listing":
if path.startswith(self.directory):
filelist = os.listdir(path)
s = '<?xml version="1.0"?>\n<folder-listing>\n'
for i in filelist:
objpath = os.path.join(path, i)
if os.path.isdir(objpath):
s += ' <folder name="{}" created="{}" />'.format(i, os.stat(objpath)[stat.ST_CTIME])
else:
s += ' <file name="{}" created="{}" size="{}" />'.format(
i, os.stat(objpath)[stat.ST_CTIME], os.stat(objpath)[stat.ST_SIZE])
s += "</folder-listing>\n"
self.logger.debug('Bluetooth get XML output:\n' + s)
response = responses.Success()
response_headers = [headers.Name(name.encode("utf8")),
headers.Length(len(s)),
headers.Body(s.encode("utf8"))]
BrowserServer.send_response(self, socket, response, response_headers)
else:
self._reject(socket)
else:
self._reject(socket)
# vim:sw=4:ts=4:et:

View file

@ -1,8 +0,0 @@
manifest:
events: {}
install:
pip:
- pybluez
- pyobex
package: platypush.backend.bluetooth.fileserver
type: backend

View file

@ -1,47 +0,0 @@
import os
# noinspection PyPackageRequirements
from PyOBEX.server import PushServer
from platypush.backend.bluetooth import BluetoothBackend
class BluetoothPushserverBackend(BluetoothBackend, PushServer):
"""
Bluetooth OBEX push server.
Enable it to allow bluetooth file transfers from other devices.
If you run platypush as a non-root user (and you should) then you to change the group owner of the
service discovery protocol file (/var/run/sdp) and add your user to that group. See
`here <https://stackoverflow.com/questions/34599703/rfcomm-bluetooth-permission-denied-error-raspberry-pi>`_
for details.
Requires:
* **pybluez** (``pip install pybluez``)
* **pyobex** (``pip install git+https://github.com/BlackLight/PyOBEX``)
"""
_sleep_on_error = 10.0
def __init__(self, port: int, address: str = '',
directory: str = os.path.join(os.path.expanduser('~'), 'bluetooth'),
whitelisted_addresses: list = None, **kwargs):
"""
:param port: Bluetooth listen port
:param address: Bluetooth address to bind the server to (default: any)
:param directory: Destination directory where files will be downloaded (default: ~/bluetooth)
:param whitelisted_addresses: If set then only accept connections from the listed device addresses
"""
BluetoothBackend.__init__(self, address=address, port=port, directory=directory,
whitelisted_addresses=whitelisted_addresses, **kwargs)
def run(self):
if not os.path.isdir(self.directory):
os.makedirs(self.directory, exist_ok=True)
super().run()
# vim:sw=4:ts=4:et:

View file

@ -1,8 +0,0 @@
manifest:
events: {}
install:
pip:
- pybluez
- pyobex
package: platypush.backend.bluetooth.pushserver
type: backend