platypush/platypush/backend/gps/__init__.py
Fabio Manganiello c3337ccc6c
[#311] Docs deps autogen sphinx plugin.
Added an `add_dependencies` plugin to the Sphinx build process that
parses the manifest files of the scanned backends and plugins and
automatically generates the documentation for the required dependencies
and triggered events.

This means that those dependencies are no longer required to be listed
in the docstring of the class itself.

Also in this commit:

- Black/LINT for some integrations that hadn't been touched in a long
  time.

- Deleted some leftovers from previous refactors (deprecated
  `backend.mqtt`, `backend.zwave.mqtt`, `backend.http.request.rss`).

- Deleted deprecated `inotify` backend - replaced by `file.monitor` (see
  #289).
2023-09-24 17:00:08 +02:00

145 lines
5.2 KiB
Python

import threading
import time
from platypush.backend import Backend
from platypush.message.event.gps import GPSVersionEvent, GPSDeviceEvent, GPSUpdateEvent
class GpsBackend(Backend):
"""
This backend can interact with a GPS device and listen for events.
Once installed gpsd you need to run it and associate it to your device. Example if your GPS device communicates
over USB and is available on /dev/ttyUSB0::
[sudo] gpsd /dev/ttyUSB0 -F /var/run/gpsd.sock
The best option is probably to run gpsd at startup as a systemd service.
"""
_fail_sleep_time = 5.0
_lat_lng_tolerance = 1e-5
_alt_tolerance = 0.5
def __init__(self, gpsd_server='localhost', gpsd_port=2947, **kwargs):
"""
:param gpsd_server: gpsd daemon server name/address (default: localhost)
:type gpsd_server: str
:param gpsd_port: Port of the gpsd daemon (default: 2947)
:type gpsd_port: int or str
"""
super().__init__(**kwargs)
self.gpsd_server = gpsd_server
self.gpsd_port = gpsd_port
self._session = None
self._session_lock = threading.RLock()
self._devices = {}
def _get_session(self):
import gps
with self._session_lock:
if not self._session:
self._session = gps.gps(
host=self.gpsd_server, port=self.gpsd_port, reconnect=True
)
self._session.stream(gps.WATCH_ENABLE | gps.WATCH_NEWSTYLE)
return self._session
def _gps_report_to_event(self, report):
if report.get('class').lower() == 'version':
return GPSVersionEvent(
release=report.get('release'),
rev=report.get('rev'),
proto_major=report.get('proto_major'),
proto_minor=report.get('proto_minor'),
)
if report.get('class').lower() == 'devices':
for device in report.get('devices', []):
if device.get(
'path'
) not in self._devices or device != self._devices.get('path'):
# noinspection DuplicatedCode
self._devices[device.get('path')] = device
return GPSDeviceEvent(
path=device.get('path'),
activated=device.get('activated'),
native=device.get('native'),
bps=device.get('bps'),
parity=device.get('parity'),
stopbits=device.get('stopbits'),
cycle=device.get('cycle'),
driver=device.get('driver'),
)
if report.get('class').lower() == 'device':
# noinspection DuplicatedCode
self._devices[report.get('path')] = report
return GPSDeviceEvent(
path=report.get('path'),
activated=report.get('activated'),
native=report.get('native'),
bps=report.get('bps'),
parity=report.get('parity'),
stopbits=report.get('stopbits'),
cycle=report.get('cycle'),
driver=report.get('driver'),
)
if report.get('class').lower() == 'tpv':
return GPSUpdateEvent(
device=report.get('device'),
latitude=report.get('lat'),
longitude=report.get('lon'),
altitude=report.get('alt'),
mode=report.get('mode'),
epv=report.get('epv'),
eph=report.get('eph'),
sep=report.get('sep'),
)
def run(self):
super().run()
self.logger.info(
'Initialized GPS backend on {}:{}'.format(self.gpsd_server, self.gpsd_port)
)
last_event = None
while not self.should_stop():
try:
session = self._get_session()
report = session.next()
event = self._gps_report_to_event(report)
if event and (
last_event is None
or abs(
(last_event.args.get('latitude') or 0)
- (event.args.get('latitude') or 0)
)
>= self._lat_lng_tolerance
or abs(
(last_event.args.get('longitude') or 0)
- (event.args.get('longitude') or 0)
)
>= self._lat_lng_tolerance
or abs(
(last_event.args.get('altitude') or 0)
- (event.args.get('altitude') or 0)
)
>= self._alt_tolerance
):
self.bus.post(event)
last_event = event
except Exception as e:
if isinstance(e, StopIteration):
self.logger.warning(
'GPS service connection lost, check that gpsd is running'
)
else:
self.logger.exception(e)
self._session = None
time.sleep(self._fail_sleep_time)
# vim:sw=4:ts=4:et: