[#348] Converted `gps` backend into a plugin.
continuous-integration/drone/push Build is passing Details

Closes: #348
This commit is contained in:
Fabio Manganiello 2024-02-05 02:13:49 +01:00
parent 8351463a11
commit 2b595623b3
Signed by: blacklight
GPG Key ID: D90FBA7F76362774
11 changed files with 538 additions and 197 deletions

View File

@ -9,7 +9,6 @@ Backends
platypush/backend/button.flic.rst
platypush/backend/camera.pi.rst
platypush/backend/chat.telegram.rst
platypush/backend/gps.rst
platypush/backend/http.rst
platypush/backend/midi.rst
platypush/backend/music.mopidy.rst

View File

@ -1,6 +0,0 @@
``gps``
=========================
.. automodule:: platypush.backend.gps
:members:

View File

@ -0,0 +1,5 @@
``gps``
=======
.. automodule:: platypush.plugins.gps
:members:

View File

@ -46,6 +46,7 @@ Plugins
platypush/plugins/gotify.rst
platypush/plugins/gpio.rst
platypush/plugins/gpio.zeroborg.rst
platypush/plugins/gps.rst
platypush/plugins/graphite.rst
platypush/plugins/hid.rst
platypush/plugins/http.request.rst

View File

@ -1,145 +0,0 @@
import threading
import time
from platypush.backend import Backend
from platypush.message.event.gps import GPSVersionEvent, GPSDeviceEvent, GPSUpdateEvent
class GpsBackend(Backend):
"""
This backend can interact with a GPS device and listen for events.
Once installed gpsd you need to run it and associate it to your device. Example if your GPS device communicates
over USB and is available on /dev/ttyUSB0::
[sudo] gpsd /dev/ttyUSB0 -F /var/run/gpsd.sock
The best option is probably to run gpsd at startup as a systemd service.
"""
_fail_sleep_time = 5.0
_lat_lng_tolerance = 1e-5
_alt_tolerance = 0.5
def __init__(self, gpsd_server='localhost', gpsd_port=2947, **kwargs):
"""
:param gpsd_server: gpsd daemon server name/address (default: localhost)
:type gpsd_server: str
:param gpsd_port: Port of the gpsd daemon (default: 2947)
:type gpsd_port: int or str
"""
super().__init__(**kwargs)
self.gpsd_server = gpsd_server
self.gpsd_port = gpsd_port
self._session = None
self._session_lock = threading.RLock()
self._devices = {}
def _get_session(self):
import gps
with self._session_lock:
if not self._session:
self._session = gps.gps(
host=self.gpsd_server, port=self.gpsd_port, reconnect=True
)
self._session.stream(gps.WATCH_ENABLE | gps.WATCH_NEWSTYLE)
return self._session
def _gps_report_to_event(self, report):
if report.get('class').lower() == 'version':
return GPSVersionEvent(
release=report.get('release'),
rev=report.get('rev'),
proto_major=report.get('proto_major'),
proto_minor=report.get('proto_minor'),
)
if report.get('class').lower() == 'devices':
for device in report.get('devices', []):
if device.get(
'path'
) not in self._devices or device != self._devices.get('path'):
# noinspection DuplicatedCode
self._devices[device.get('path')] = device
return GPSDeviceEvent(
path=device.get('path'),
activated=device.get('activated'),
native=device.get('native'),
bps=device.get('bps'),
parity=device.get('parity'),
stopbits=device.get('stopbits'),
cycle=device.get('cycle'),
driver=device.get('driver'),
)
if report.get('class').lower() == 'device':
# noinspection DuplicatedCode
self._devices[report.get('path')] = report
return GPSDeviceEvent(
path=report.get('path'),
activated=report.get('activated'),
native=report.get('native'),
bps=report.get('bps'),
parity=report.get('parity'),
stopbits=report.get('stopbits'),
cycle=report.get('cycle'),
driver=report.get('driver'),
)
if report.get('class').lower() == 'tpv':
return GPSUpdateEvent(
device=report.get('device'),
latitude=report.get('lat'),
longitude=report.get('lon'),
altitude=report.get('alt'),
mode=report.get('mode'),
epv=report.get('epv'),
eph=report.get('eph'),
sep=report.get('sep'),
)
def run(self):
super().run()
self.logger.info(
'Initialized GPS backend on {}:{}'.format(self.gpsd_server, self.gpsd_port)
)
last_event = None
while not self.should_stop():
try:
session = self._get_session()
report = session.next()
event = self._gps_report_to_event(report)
if event and (
last_event is None
or abs(
(last_event.args.get('latitude') or 0)
- (event.args.get('latitude') or 0)
)
>= self._lat_lng_tolerance
or abs(
(last_event.args.get('longitude') or 0)
- (event.args.get('longitude') or 0)
)
>= self._lat_lng_tolerance
or abs(
(last_event.args.get('altitude') or 0)
- (event.args.get('altitude') or 0)
)
>= self._alt_tolerance
):
self.bus.post(event)
last_event = event
except Exception as e:
if isinstance(e, StopIteration):
self.logger.warning(
'GPS service connection lost, check that gpsd is running'
)
else:
self.logger.exception(e)
self._session = None
time.sleep(self._fail_sleep_time)
# vim:sw=4:ts=4:et:

View File

@ -1,20 +0,0 @@
manifest:
events:
platypush.message.event.gps.GPSDeviceEvent: when a GPS device is connected or
updated
platypush.message.event.gps.GPSUpdateEvent: when a GPS device has new data
platypush.message.event.gps.GPSVersionEvent: when a GPS device advertises its
version data
install:
apk:
- gpsd
apt:
- gpsd
dnf:
- gpsd
pacman:
- gpsd
pip:
- gps
package: platypush.backend.gps
type: backend

View File

@ -1,44 +1,97 @@
from abc import ABC
from datetime import datetime
from typing import Optional
from platypush.message.event import Event
class GPSEvent(Event):
class GPSEvent(Event, ABC):
"""
Generic class for GPS events
Generic class for GPS events.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class GPSVersionEvent(GPSEvent):
"""
Event usually triggered on startup or reconnection, when the GPS device advertises its version parameters
"""
def __init__(self, release=None, rev=None, proto_major=None, proto_minor=None, *args, **kwargs):
super().__init__(release=release, rev=rev, proto_major=proto_major, proto_minor=proto_minor, *args, **kwargs)
class GPSDeviceEvent(GPSEvent):
"""
Event triggered when a new GPS device is connected or reconfigured
Event triggered when a new GPS device is connected or reconfigured.
"""
def __init__(self, path, activated=None, native=False, bps=None, parity=None, stopbits=None,
cycle=None, driver=None, *args, **kwargs):
super().__init__(*args, path=path, activated=activated, native=native, bps=bps, parity=parity,
stopbits=stopbits, cycle=cycle, driver=driver, **kwargs)
def __init__(
self,
path: str,
*args,
activated: Optional[datetime] = None,
native: bool = False,
baudrate: Optional[int] = None,
parity: Optional[str] = None,
stopbits: Optional[int] = None,
cycle: Optional[float] = None,
driver: Optional[str] = None,
subtype: Optional[str] = None,
**kwargs
):
"""
:param path: Device path.
:param activated: Device activation timestamp.
:param native: Device native status.
:param baudrate: Device baudrate.
:param parity: Device parity.
:param stopbits: Device stopbits.
:param cycle: Device cycle.
:param driver: Device driver.
:param subtype: Device subtype.
"""
super().__init__(
*args,
path=path,
activated=activated,
native=native,
baudrate=baudrate,
parity=parity,
stopbits=stopbits,
cycle=cycle,
driver=driver,
subtype=subtype,
**kwargs
)
class GPSUpdateEvent(GPSEvent):
class GPSLocationUpdateEvent(GPSEvent):
"""
Event triggered upon GPS status update
Event triggered upon GPS status update.
"""
def __init__(self, device=None, latitude=None, longitude=None, altitude=None, mode=None, epv=None, eph=None,
sep=None, *args, **kwargs):
super().__init__(*args, device=device, latitude=latitude, longitude=longitude, altitude=altitude,
mode=mode, epv=epv, eph=eph, sep=sep, **kwargs)
def __init__(
self,
*args,
device=None,
latitude=None,
longitude=None,
altitude=None,
mode=None,
**kwargs
):
super().__init__(
*args,
device=device,
latitude=latitude,
longitude=longitude,
altitude=altitude,
mode=mode,
**kwargs
)
class GPSEnabledEvent(GPSEvent):
"""
Event triggered when the GPS polling is enabled.
"""
class GPSDisabledEvent(GPSEvent):
"""
Event triggered when the GPS polling is disabled.
"""
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,216 @@
import threading
from contextlib import contextmanager
from dataclasses import asdict
from datetime import datetime
from platypush.plugins import RunnablePlugin, action
from platypush.message.event.gps import (
GPSDeviceEvent,
GPSDisabledEvent,
GPSEnabledEvent,
GPSLocationUpdateEvent,
)
from platypush.schemas.gps import GpsDeviceSchema, GpsStatusSchema
from ._model import DeviceMode, GpsDevice, GpsStatus
class GpsPlugin(RunnablePlugin):
"""
This plugin can interact with a GPS device compatible with `gpsd
<https://gpsd.io/>`_ and emit events when the location changes.
It requires ``gpsd`` to run on a system with a compatible GPS device
connected - most of the off-the-shelf GPS devices over USB or serial
interfaces should tick the box.
For example, if your GPS device communicates over USB and is available on
/dev/ttyUSB0, you can start the gpsd daemon with the following command
before starting Platypush::
[sudo] gpsd /dev/ttyUSB0 [-S 2947]
It will expose GPS events over the port ``2947`` by default, and you can
subscribe to them through this plugin.
"""
_default_gpsd_port = 2947
_default_poll_interval = 5.0
_lat_lng_tolerance = 1e-5
_alt_tolerance = 0.5
def __init__(
self,
gpsd_server: str = 'localhost',
gpsd_port: int = _default_gpsd_port,
poll_interval: float = _default_poll_interval,
enable_on_start: bool = True,
**kwargs,
):
"""
:param gpsd_server: gpsd daemon server name/address (default: localhost).
:param gpsd_port: Port of the gpsd daemon (default: 2947).
:param poll_interval: How long to wait before polling the GPS device
again in case of error (default: 5 seconds).
:param enable_on_start: If True, the GPS polling will be enabled when the
plugin starts (default: True). Otherwise, it'll have to be enabled by
calling the :meth:`.enable` action.
"""
super().__init__(poll_interval=poll_interval, **kwargs)
self.gpsd_server = gpsd_server
self.gpsd_port = gpsd_port
self._enable_on_start = enable_on_start
self._session = None
self._session_lock = threading.RLock()
self._status = GpsStatus()
@contextmanager
def _get_session(self):
import gps
with self._session_lock:
if not self._session:
self._session = gps.gps(
host=self.gpsd_server, port=self.gpsd_port, reconnect=True # type: ignore
)
yield self._session
with self._session_lock:
try:
self.disable()
except Exception as e:
self.logger.warning('Error disabling GPSD watch: %s', e)
self._session.close()
self._session = None
def _update_device(self, device: dict):
path = device.get('path')
if not path:
return
cur_dev = self._status.devices.get(path)
new_dev = GpsDevice(**GpsDeviceSchema().load(device)) # type: ignore
if cur_dev and asdict(cur_dev) == asdict(new_dev):
return
self._status.devices[path] = new_dev
self._bus.post(GPSDeviceEvent(**asdict(new_dev)))
def _handle_location_update(self, report: dict):
dev, lat, long, alt, mode, t = (
report.get('device'),
report.get('lat'),
report.get('lon'),
report.get('alt'),
report.get('mode'),
report.get('time'),
)
if not (dev and lat and long and mode):
return
dev_mode = DeviceMode(mode)
self._status.timestamp = datetime.fromisoformat(t) if t else None
self._status.devices[dev].mode = dev_mode
if not (
abs((self._status.latitude or 0) - lat) >= self._lat_lng_tolerance
or abs((self._status.longitude or 0) - long) >= self._lat_lng_tolerance
or abs((self._status.altitude or 0) - (alt or 0)) >= self._alt_tolerance
):
return
event = GPSLocationUpdateEvent(
device=dev,
latitude=lat,
longitude=long,
altitude=alt,
mode=dev_mode.name,
)
self._status.latitude = lat
self._status.longitude = long
self._status.altitude = alt
self._bus.post(event)
def _handle_report(self, report: dict):
cls = report['class'].lower()
if cls == 'version':
self.logger.info('Received GPSD version event: %s', dict(report))
return
if cls == 'watch':
evt_type = GPSEnabledEvent if report.get('enable') else GPSDisabledEvent
self._bus.post(evt_type())
return
if cls == 'devices':
for device in report.get('devices', []):
self._update_device(device)
return
if cls == 'device':
self._update_device(report)
return
if cls == 'tpv':
self._handle_location_update(report)
@action
def enable(self):
"""
Enable the GPS polling.
"""
import gps
assert self._session, 'GPSD session not initialized'
self._session.stream(gps.WATCH_ENABLE | gps.WATCH_NEWSTYLE)
@action
def disable(self):
"""
Disable the GPS polling.
"""
import gps
assert self._session, 'GPSD session not initialized'
self._session.stream(gps.WATCH_DISABLE)
@action
def status(self):
"""
:returns: The current GPS status:
.. schema:: gps.GpsStatusSchema
"""
return GpsStatusSchema().dump(self._status)
def main(self):
while not self.should_stop():
first_run = True
try:
with self._get_session() as session:
if first_run and self._enable_on_start:
self.enable()
first_run = False
while not self.should_stop():
report: dict = session.next() # type: ignore
self._handle_report(report)
except Exception as e:
if isinstance(e, StopIteration):
self.logger.warning(
'GPS service connection lost, check that gpsd is running'
)
else:
self.logger.exception(e)
self.wait_stop(self.poll_interval)
# vim:sw=4:ts=4:et:

View File

@ -0,0 +1,47 @@
from datetime import datetime
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Optional
class DeviceMode(Enum):
"""
GPS device mode.
"""
NO_FIX = 1 # No fix
TWO_D = 2 # 2D fix
THREE_D = 3 # 3D fix (including altitude)
@dataclass
class GpsDevice:
"""
Models the GPS device.
"""
path: str
activated: Optional[datetime] = None
native: bool = False
baudrate: Optional[int] = None
parity: str = 'N'
stopbits: Optional[int] = None
cycle: Optional[float] = None
driver: Optional[str] = None
subtype: Optional[str] = None
mode: Optional[DeviceMode] = None
@dataclass
class GpsStatus:
"""
Models the status of the GPS service.
"""
latitude: Optional[float] = None
longitude: Optional[float] = None
altitude: Optional[float] = None
speed: Optional[float] = None
satellites_used: int = 0
devices: Dict[str, GpsDevice] = field(default_factory=dict)
timestamp: Optional[datetime] = None

View File

@ -0,0 +1,22 @@
manifest:
events:
- platypush.message.event.gps.GPSDeviceEvent
- platypush.message.event.gps.GPSDisabledEvent
- platypush.message.event.gps.GPSEnabledEvent
- platypush.message.event.gps.GPSLocationUpdateEvent
install:
apk:
- gpsd
- py3-gpsd
apt:
- gpsd
- python3-gps
dnf:
- gpsd
- python-gpsd
pacman:
- gpsd
pip:
- gps
package: platypush.plugins.gps
type: plugin

169
platypush/schemas/gps.py Normal file
View File

@ -0,0 +1,169 @@
from marshmallow import EXCLUDE, fields, pre_load
from marshmallow.schema import Schema
from platypush.schemas import DateTime
class GpsDeviceSchema(Schema):
"""
Schema for a GPS device.
"""
# pylint: disable=too-few-public-methods
class Meta: # type: ignore
"""
Exclude unknown fields from the deserialized output.
"""
unknown = EXCLUDE
path = fields.String(
required=True,
metadata={
"description": "Device path",
"example": "/dev/ttyUSB0",
},
)
activated = DateTime(
metadata={
"description": "Device activation status",
"example": True,
},
)
native = fields.Boolean(
metadata={
"description": "Device native status",
"example": False,
},
)
baudrate = fields.Integer(
data_key="bps",
metadata={
"description": "Device baudrate",
"example": 9600,
},
)
parity = fields.String(
metadata={
"description": "Device parity",
"example": "N",
},
)
stopbits = fields.Integer(
metadata={
"description": "Device stopbits",
"example": 1,
},
)
cycle = fields.Integer(
metadata={
"description": "Device cycle",
"example": 1,
},
)
driver = fields.String(
metadata={
"description": "Device driver",
"example": "NMEA",
},
)
subtype = fields.String(
metadata={
"description": "Device subtype",
"example": "AXN_2.31_3339_13101700,5632,PA6H,1.0",
},
)
mode = fields.String(
validate=lambda mode: mode in ["NO_FIX", "TWO_D", "THREE_D"],
metadata={
"description": "Device mode, one of NO_FIX, TWO_D, THREE_D",
"example": "3D",
},
)
@pre_load
def pre_load(self, data, **_):
from platypush.plugins.gps import DeviceMode
if data and data.get("mode"):
data["mode"] = DeviceMode(data["mode"]).value
return data
class GpsStatusSchema(Schema):
"""
Schema for the GPS status.
"""
latitude = fields.Float(
metadata={
"description": "Latitude",
"example": 45.4642,
},
)
longitude = fields.Float(
metadata={
"description": "Longitude",
"example": 9.1900,
},
)
altitude = fields.Float(
metadata={
"description": "Altitude (in meters)",
"example": 100,
},
)
speed = fields.Float(
metadata={
"description": "Measured speed, if available (in km/h)",
"example": 10,
},
)
satellites_used = fields.Integer(
metadata={
"description": "Number of satellites used for the fix",
"example": 4,
},
)
timestamp = DateTime(
metadata={
"description": "Timestamp of the last GPS update",
"example": "2021-08-01T00:00:00",
},
)
devices = fields.Dict(
keys=fields.String(),
values=fields.Nested(GpsDeviceSchema),
metadata={
"description": "Available GPS devices",
"example": {
"/dev/ttyUSB0": {
"path": "/dev/ttyUSB0",
"activated": "2021-08-01T00:00:00",
"native": False,
"baudrate": 9600,
"parity": "N",
"stopbits": 1,
"cycle": 1,
"driver": "NMEA",
"subtype": "AXN_2.31_3339_13101700,5632,PA6H,1.0",
"mode": "3D",
}
},
},
)