From 2b595623b3b1e024d8287aefdcbdc47bbafade5a Mon Sep 17 00:00:00 2001
From: Fabio Manganiello <fabio@manganiello.tech>
Date: Mon, 5 Feb 2024 02:13:49 +0100
Subject: [PATCH] [#348] Converted `gps` backend into a plugin.

Closes: #348
---
 docs/source/backends.rst              |   1 -
 docs/source/platypush/backend/gps.rst |   6 -
 docs/source/platypush/plugins/gps.rst |   5 +
 docs/source/plugins.rst               |   1 +
 platypush/backend/gps/__init__.py     | 145 -----------------
 platypush/backend/gps/manifest.yaml   |  20 ---
 platypush/message/event/gps.py        | 103 +++++++++---
 platypush/plugins/gps/__init__.py     | 216 ++++++++++++++++++++++++++
 platypush/plugins/gps/_model.py       |  47 ++++++
 platypush/plugins/gps/manifest.yaml   |  22 +++
 platypush/schemas/gps.py              | 169 ++++++++++++++++++++
 11 files changed, 538 insertions(+), 197 deletions(-)
 delete mode 100644 docs/source/platypush/backend/gps.rst
 create mode 100644 docs/source/platypush/plugins/gps.rst
 delete mode 100644 platypush/backend/gps/__init__.py
 delete mode 100644 platypush/backend/gps/manifest.yaml
 create mode 100644 platypush/plugins/gps/__init__.py
 create mode 100644 platypush/plugins/gps/_model.py
 create mode 100644 platypush/plugins/gps/manifest.yaml
 create mode 100644 platypush/schemas/gps.py

diff --git a/docs/source/backends.rst b/docs/source/backends.rst
index 09bd8f5cd..96ff95700 100644
--- a/docs/source/backends.rst
+++ b/docs/source/backends.rst
@@ -9,7 +9,6 @@ Backends
     platypush/backend/button.flic.rst
     platypush/backend/camera.pi.rst
     platypush/backend/chat.telegram.rst
-    platypush/backend/gps.rst
     platypush/backend/http.rst
     platypush/backend/midi.rst
     platypush/backend/music.mopidy.rst
diff --git a/docs/source/platypush/backend/gps.rst b/docs/source/platypush/backend/gps.rst
deleted file mode 100644
index 4542ee3cd..000000000
--- a/docs/source/platypush/backend/gps.rst
+++ /dev/null
@@ -1,6 +0,0 @@
-``gps``
-=========================
-
-.. automodule:: platypush.backend.gps
-    :members:
-
diff --git a/docs/source/platypush/plugins/gps.rst b/docs/source/platypush/plugins/gps.rst
new file mode 100644
index 000000000..f0a96ad2c
--- /dev/null
+++ b/docs/source/platypush/plugins/gps.rst
@@ -0,0 +1,5 @@
+``gps``
+=======
+
+.. automodule:: platypush.plugins.gps
+    :members:
diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst
index 4458165b5..c8595b29b 100644
--- a/docs/source/plugins.rst
+++ b/docs/source/plugins.rst
@@ -46,6 +46,7 @@ Plugins
     platypush/plugins/gotify.rst
     platypush/plugins/gpio.rst
     platypush/plugins/gpio.zeroborg.rst
+    platypush/plugins/gps.rst
     platypush/plugins/graphite.rst
     platypush/plugins/hid.rst
     platypush/plugins/http.request.rst
diff --git a/platypush/backend/gps/__init__.py b/platypush/backend/gps/__init__.py
deleted file mode 100644
index 2c6b9a79b..000000000
--- a/platypush/backend/gps/__init__.py
+++ /dev/null
@@ -1,145 +0,0 @@
-import threading
-import time
-
-from platypush.backend import Backend
-from platypush.message.event.gps import GPSVersionEvent, GPSDeviceEvent, GPSUpdateEvent
-
-
-class GpsBackend(Backend):
-    """
-    This backend can interact with a GPS device and listen for events.
-
-    Once installed gpsd you need to run it and associate it to your device. Example if your GPS device communicates
-    over USB and is available on /dev/ttyUSB0::
-
-        [sudo] gpsd /dev/ttyUSB0 -F /var/run/gpsd.sock
-
-    The best option is probably to run gpsd at startup as a systemd service.
-    """
-
-    _fail_sleep_time = 5.0
-    _lat_lng_tolerance = 1e-5
-    _alt_tolerance = 0.5
-
-    def __init__(self, gpsd_server='localhost', gpsd_port=2947, **kwargs):
-        """
-        :param gpsd_server: gpsd daemon server name/address (default: localhost)
-        :type gpsd_server: str
-        :param gpsd_port: Port of the gpsd daemon (default: 2947)
-        :type gpsd_port: int or str
-        """
-        super().__init__(**kwargs)
-
-        self.gpsd_server = gpsd_server
-        self.gpsd_port = gpsd_port
-        self._session = None
-        self._session_lock = threading.RLock()
-        self._devices = {}
-
-    def _get_session(self):
-        import gps
-
-        with self._session_lock:
-            if not self._session:
-                self._session = gps.gps(
-                    host=self.gpsd_server, port=self.gpsd_port, reconnect=True
-                )
-                self._session.stream(gps.WATCH_ENABLE | gps.WATCH_NEWSTYLE)
-
-        return self._session
-
-    def _gps_report_to_event(self, report):
-        if report.get('class').lower() == 'version':
-            return GPSVersionEvent(
-                release=report.get('release'),
-                rev=report.get('rev'),
-                proto_major=report.get('proto_major'),
-                proto_minor=report.get('proto_minor'),
-            )
-        if report.get('class').lower() == 'devices':
-            for device in report.get('devices', []):
-                if device.get(
-                    'path'
-                ) not in self._devices or device != self._devices.get('path'):
-                    # noinspection DuplicatedCode
-                    self._devices[device.get('path')] = device
-                    return GPSDeviceEvent(
-                        path=device.get('path'),
-                        activated=device.get('activated'),
-                        native=device.get('native'),
-                        bps=device.get('bps'),
-                        parity=device.get('parity'),
-                        stopbits=device.get('stopbits'),
-                        cycle=device.get('cycle'),
-                        driver=device.get('driver'),
-                    )
-        if report.get('class').lower() == 'device':
-            # noinspection DuplicatedCode
-            self._devices[report.get('path')] = report
-            return GPSDeviceEvent(
-                path=report.get('path'),
-                activated=report.get('activated'),
-                native=report.get('native'),
-                bps=report.get('bps'),
-                parity=report.get('parity'),
-                stopbits=report.get('stopbits'),
-                cycle=report.get('cycle'),
-                driver=report.get('driver'),
-            )
-        if report.get('class').lower() == 'tpv':
-            return GPSUpdateEvent(
-                device=report.get('device'),
-                latitude=report.get('lat'),
-                longitude=report.get('lon'),
-                altitude=report.get('alt'),
-                mode=report.get('mode'),
-                epv=report.get('epv'),
-                eph=report.get('eph'),
-                sep=report.get('sep'),
-            )
-
-    def run(self):
-        super().run()
-        self.logger.info(
-            'Initialized GPS backend on {}:{}'.format(self.gpsd_server, self.gpsd_port)
-        )
-        last_event = None
-
-        while not self.should_stop():
-            try:
-                session = self._get_session()
-                report = session.next()
-                event = self._gps_report_to_event(report)
-                if event and (
-                    last_event is None
-                    or abs(
-                        (last_event.args.get('latitude') or 0)
-                        - (event.args.get('latitude') or 0)
-                    )
-                    >= self._lat_lng_tolerance
-                    or abs(
-                        (last_event.args.get('longitude') or 0)
-                        - (event.args.get('longitude') or 0)
-                    )
-                    >= self._lat_lng_tolerance
-                    or abs(
-                        (last_event.args.get('altitude') or 0)
-                        - (event.args.get('altitude') or 0)
-                    )
-                    >= self._alt_tolerance
-                ):
-                    self.bus.post(event)
-                    last_event = event
-            except Exception as e:
-                if isinstance(e, StopIteration):
-                    self.logger.warning(
-                        'GPS service connection lost, check that gpsd is running'
-                    )
-                else:
-                    self.logger.exception(e)
-
-                self._session = None
-                time.sleep(self._fail_sleep_time)
-
-
-# vim:sw=4:ts=4:et:
diff --git a/platypush/backend/gps/manifest.yaml b/platypush/backend/gps/manifest.yaml
deleted file mode 100644
index cb1879f53..000000000
--- a/platypush/backend/gps/manifest.yaml
+++ /dev/null
@@ -1,20 +0,0 @@
-manifest:
-  events:
-    platypush.message.event.gps.GPSDeviceEvent: when a GPS device is connected or
-      updated
-    platypush.message.event.gps.GPSUpdateEvent: when a GPS device has new data
-    platypush.message.event.gps.GPSVersionEvent: when a GPS device advertises its
-      version data
-  install:
-    apk:
-      - gpsd
-    apt:
-      - gpsd
-    dnf:
-      - gpsd
-    pacman:
-      - gpsd
-    pip:
-      - gps
-  package: platypush.backend.gps
-  type: backend
diff --git a/platypush/message/event/gps.py b/platypush/message/event/gps.py
index 7b657a171..fd6f25c14 100644
--- a/platypush/message/event/gps.py
+++ b/platypush/message/event/gps.py
@@ -1,44 +1,97 @@
+from abc import ABC
+from datetime import datetime
+from typing import Optional
+
 from platypush.message.event import Event
 
 
-class GPSEvent(Event):
+class GPSEvent(Event, ABC):
     """
-    Generic class for GPS events
+    Generic class for GPS events.
     """
 
-    def __init__(self, *args, **kwargs):
-        super().__init__(*args, **kwargs)
-
-
-class GPSVersionEvent(GPSEvent):
-    """
-    Event usually triggered on startup or reconnection, when the GPS device advertises its version parameters
-    """
-
-    def __init__(self, release=None, rev=None, proto_major=None, proto_minor=None, *args, **kwargs):
-        super().__init__(release=release, rev=rev, proto_major=proto_major, proto_minor=proto_minor, *args, **kwargs)
-
 
 class GPSDeviceEvent(GPSEvent):
     """
-    Event triggered when a new GPS device is connected or reconfigured
+    Event triggered when a new GPS device is connected or reconfigured.
     """
 
-    def __init__(self, path, activated=None, native=False, bps=None, parity=None, stopbits=None,
-                 cycle=None, driver=None, *args, **kwargs):
-        super().__init__(*args, path=path, activated=activated, native=native, bps=bps, parity=parity,
-                         stopbits=stopbits, cycle=cycle, driver=driver, **kwargs)
+    def __init__(
+        self,
+        path: str,
+        *args,
+        activated: Optional[datetime] = None,
+        native: bool = False,
+        baudrate: Optional[int] = None,
+        parity: Optional[str] = None,
+        stopbits: Optional[int] = None,
+        cycle: Optional[float] = None,
+        driver: Optional[str] = None,
+        subtype: Optional[str] = None,
+        **kwargs
+    ):
+        """
+        :param path: Device path.
+        :param activated: Device activation timestamp.
+        :param native: Device native status.
+        :param baudrate: Device baudrate.
+        :param parity: Device parity.
+        :param stopbits: Device stopbits.
+        :param cycle: Device cycle.
+        :param driver: Device driver.
+        :param subtype: Device subtype.
+        """
+        super().__init__(
+            *args,
+            path=path,
+            activated=activated,
+            native=native,
+            baudrate=baudrate,
+            parity=parity,
+            stopbits=stopbits,
+            cycle=cycle,
+            driver=driver,
+            subtype=subtype,
+            **kwargs
+        )
 
 
-class GPSUpdateEvent(GPSEvent):
+class GPSLocationUpdateEvent(GPSEvent):
     """
-    Event triggered upon GPS status update
+    Event triggered upon GPS status update.
     """
 
-    def __init__(self, device=None, latitude=None, longitude=None, altitude=None, mode=None, epv=None, eph=None,
-                 sep=None, *args, **kwargs):
-        super().__init__(*args, device=device, latitude=latitude, longitude=longitude, altitude=altitude,
-                         mode=mode, epv=epv, eph=eph, sep=sep, **kwargs)
+    def __init__(
+        self,
+        *args,
+        device=None,
+        latitude=None,
+        longitude=None,
+        altitude=None,
+        mode=None,
+        **kwargs
+    ):
+        super().__init__(
+            *args,
+            device=device,
+            latitude=latitude,
+            longitude=longitude,
+            altitude=altitude,
+            mode=mode,
+            **kwargs
+        )
+
+
+class GPSEnabledEvent(GPSEvent):
+    """
+    Event triggered when the GPS polling is enabled.
+    """
+
+
+class GPSDisabledEvent(GPSEvent):
+    """
+    Event triggered when the GPS polling is disabled.
+    """
 
 
 # vim:sw=4:ts=4:et:
diff --git a/platypush/plugins/gps/__init__.py b/platypush/plugins/gps/__init__.py
new file mode 100644
index 000000000..463024b71
--- /dev/null
+++ b/platypush/plugins/gps/__init__.py
@@ -0,0 +1,216 @@
+import threading
+from contextlib import contextmanager
+from dataclasses import asdict
+from datetime import datetime
+
+from platypush.plugins import RunnablePlugin, action
+from platypush.message.event.gps import (
+    GPSDeviceEvent,
+    GPSDisabledEvent,
+    GPSEnabledEvent,
+    GPSLocationUpdateEvent,
+)
+from platypush.schemas.gps import GpsDeviceSchema, GpsStatusSchema
+
+from ._model import DeviceMode, GpsDevice, GpsStatus
+
+
+class GpsPlugin(RunnablePlugin):
+    """
+    This plugin can interact with a GPS device compatible with `gpsd
+    <https://gpsd.io/>`_ and emit events when the location changes.
+
+    It requires ``gpsd`` to run on a system with a compatible GPS device
+    connected - most of the off-the-shelf GPS devices over USB or serial
+    interfaces should tick the box.
+
+    For example, if your GPS device communicates over USB and is available on
+    /dev/ttyUSB0, you can start the gpsd daemon with the following command
+    before starting Platypush::
+
+        [sudo] gpsd /dev/ttyUSB0 [-S 2947]
+
+    It will expose GPS events over the port ``2947`` by default, and you can
+    subscribe to them through this plugin.
+    """
+
+    _default_gpsd_port = 2947
+    _default_poll_interval = 5.0
+    _lat_lng_tolerance = 1e-5
+    _alt_tolerance = 0.5
+
+    def __init__(
+        self,
+        gpsd_server: str = 'localhost',
+        gpsd_port: int = _default_gpsd_port,
+        poll_interval: float = _default_poll_interval,
+        enable_on_start: bool = True,
+        **kwargs,
+    ):
+        """
+        :param gpsd_server: gpsd daemon server name/address (default: localhost).
+        :param gpsd_port: Port of the gpsd daemon (default: 2947).
+        :param poll_interval: How long to wait before polling the GPS device
+            again in case of error (default: 5 seconds).
+        :param enable_on_start: If True, the GPS polling will be enabled when the
+            plugin starts (default: True). Otherwise, it'll have to be enabled by
+            calling the :meth:`.enable` action.
+        """
+        super().__init__(poll_interval=poll_interval, **kwargs)
+
+        self.gpsd_server = gpsd_server
+        self.gpsd_port = gpsd_port
+        self._enable_on_start = enable_on_start
+        self._session = None
+        self._session_lock = threading.RLock()
+        self._status = GpsStatus()
+
+    @contextmanager
+    def _get_session(self):
+        import gps
+
+        with self._session_lock:
+            if not self._session:
+                self._session = gps.gps(
+                    host=self.gpsd_server, port=self.gpsd_port, reconnect=True  # type: ignore
+                )
+
+        yield self._session
+
+        with self._session_lock:
+            try:
+                self.disable()
+            except Exception as e:
+                self.logger.warning('Error disabling GPSD watch: %s', e)
+
+            self._session.close()
+            self._session = None
+
+    def _update_device(self, device: dict):
+        path = device.get('path')
+        if not path:
+            return
+
+        cur_dev = self._status.devices.get(path)
+        new_dev = GpsDevice(**GpsDeviceSchema().load(device))  # type: ignore
+        if cur_dev and asdict(cur_dev) == asdict(new_dev):
+            return
+
+        self._status.devices[path] = new_dev
+        self._bus.post(GPSDeviceEvent(**asdict(new_dev)))
+
+    def _handle_location_update(self, report: dict):
+        dev, lat, long, alt, mode, t = (
+            report.get('device'),
+            report.get('lat'),
+            report.get('lon'),
+            report.get('alt'),
+            report.get('mode'),
+            report.get('time'),
+        )
+
+        if not (dev and lat and long and mode):
+            return
+
+        dev_mode = DeviceMode(mode)
+        self._status.timestamp = datetime.fromisoformat(t) if t else None
+        self._status.devices[dev].mode = dev_mode
+
+        if not (
+            abs((self._status.latitude or 0) - lat) >= self._lat_lng_tolerance
+            or abs((self._status.longitude or 0) - long) >= self._lat_lng_tolerance
+            or abs((self._status.altitude or 0) - (alt or 0)) >= self._alt_tolerance
+        ):
+            return
+
+        event = GPSLocationUpdateEvent(
+            device=dev,
+            latitude=lat,
+            longitude=long,
+            altitude=alt,
+            mode=dev_mode.name,
+        )
+
+        self._status.latitude = lat
+        self._status.longitude = long
+        self._status.altitude = alt
+        self._bus.post(event)
+
+    def _handle_report(self, report: dict):
+        cls = report['class'].lower()
+        if cls == 'version':
+            self.logger.info('Received GPSD version event: %s', dict(report))
+            return
+
+        if cls == 'watch':
+            evt_type = GPSEnabledEvent if report.get('enable') else GPSDisabledEvent
+            self._bus.post(evt_type())
+            return
+
+        if cls == 'devices':
+            for device in report.get('devices', []):
+                self._update_device(device)
+            return
+
+        if cls == 'device':
+            self._update_device(report)
+            return
+
+        if cls == 'tpv':
+            self._handle_location_update(report)
+
+    @action
+    def enable(self):
+        """
+        Enable the GPS polling.
+        """
+        import gps
+
+        assert self._session, 'GPSD session not initialized'
+        self._session.stream(gps.WATCH_ENABLE | gps.WATCH_NEWSTYLE)
+
+    @action
+    def disable(self):
+        """
+        Disable the GPS polling.
+        """
+        import gps
+
+        assert self._session, 'GPSD session not initialized'
+        self._session.stream(gps.WATCH_DISABLE)
+
+    @action
+    def status(self):
+        """
+        :returns: The current GPS status:
+
+            .. schema:: gps.GpsStatusSchema
+
+        """
+        return GpsStatusSchema().dump(self._status)
+
+    def main(self):
+        while not self.should_stop():
+            first_run = True
+
+            try:
+                with self._get_session() as session:
+                    if first_run and self._enable_on_start:
+                        self.enable()
+                        first_run = False
+
+                    while not self.should_stop():
+                        report: dict = session.next()  # type: ignore
+                        self._handle_report(report)
+            except Exception as e:
+                if isinstance(e, StopIteration):
+                    self.logger.warning(
+                        'GPS service connection lost, check that gpsd is running'
+                    )
+                else:
+                    self.logger.exception(e)
+
+                self.wait_stop(self.poll_interval)
+
+
+# vim:sw=4:ts=4:et:
diff --git a/platypush/plugins/gps/_model.py b/platypush/plugins/gps/_model.py
new file mode 100644
index 000000000..b9f27108e
--- /dev/null
+++ b/platypush/plugins/gps/_model.py
@@ -0,0 +1,47 @@
+from datetime import datetime
+from dataclasses import dataclass, field
+from enum import Enum
+from typing import Dict, Optional
+
+
+class DeviceMode(Enum):
+    """
+    GPS device mode.
+    """
+
+    NO_FIX = 1  # No fix
+    TWO_D = 2  # 2D fix
+    THREE_D = 3  # 3D fix (including altitude)
+
+
+@dataclass
+class GpsDevice:
+    """
+    Models the GPS device.
+    """
+
+    path: str
+    activated: Optional[datetime] = None
+    native: bool = False
+    baudrate: Optional[int] = None
+    parity: str = 'N'
+    stopbits: Optional[int] = None
+    cycle: Optional[float] = None
+    driver: Optional[str] = None
+    subtype: Optional[str] = None
+    mode: Optional[DeviceMode] = None
+
+
+@dataclass
+class GpsStatus:
+    """
+    Models the status of the GPS service.
+    """
+
+    latitude: Optional[float] = None
+    longitude: Optional[float] = None
+    altitude: Optional[float] = None
+    speed: Optional[float] = None
+    satellites_used: int = 0
+    devices: Dict[str, GpsDevice] = field(default_factory=dict)
+    timestamp: Optional[datetime] = None
diff --git a/platypush/plugins/gps/manifest.yaml b/platypush/plugins/gps/manifest.yaml
new file mode 100644
index 000000000..389c55a13
--- /dev/null
+++ b/platypush/plugins/gps/manifest.yaml
@@ -0,0 +1,22 @@
+manifest:
+  events:
+    - platypush.message.event.gps.GPSDeviceEvent
+    - platypush.message.event.gps.GPSDisabledEvent
+    - platypush.message.event.gps.GPSEnabledEvent
+    - platypush.message.event.gps.GPSLocationUpdateEvent
+  install:
+    apk:
+      - gpsd
+      - py3-gpsd
+    apt:
+      - gpsd
+      - python3-gps
+    dnf:
+      - gpsd
+      - python-gpsd
+    pacman:
+      - gpsd
+    pip:
+      - gps
+  package: platypush.plugins.gps
+  type: plugin
diff --git a/platypush/schemas/gps.py b/platypush/schemas/gps.py
new file mode 100644
index 000000000..02295a264
--- /dev/null
+++ b/platypush/schemas/gps.py
@@ -0,0 +1,169 @@
+from marshmallow import EXCLUDE, fields, pre_load
+from marshmallow.schema import Schema
+
+from platypush.schemas import DateTime
+
+
+class GpsDeviceSchema(Schema):
+    """
+    Schema for a GPS device.
+    """
+
+    # pylint: disable=too-few-public-methods
+    class Meta:  # type: ignore
+        """
+        Exclude unknown fields from the deserialized output.
+        """
+
+        unknown = EXCLUDE
+
+    path = fields.String(
+        required=True,
+        metadata={
+            "description": "Device path",
+            "example": "/dev/ttyUSB0",
+        },
+    )
+
+    activated = DateTime(
+        metadata={
+            "description": "Device activation status",
+            "example": True,
+        },
+    )
+
+    native = fields.Boolean(
+        metadata={
+            "description": "Device native status",
+            "example": False,
+        },
+    )
+
+    baudrate = fields.Integer(
+        data_key="bps",
+        metadata={
+            "description": "Device baudrate",
+            "example": 9600,
+        },
+    )
+
+    parity = fields.String(
+        metadata={
+            "description": "Device parity",
+            "example": "N",
+        },
+    )
+
+    stopbits = fields.Integer(
+        metadata={
+            "description": "Device stopbits",
+            "example": 1,
+        },
+    )
+
+    cycle = fields.Integer(
+        metadata={
+            "description": "Device cycle",
+            "example": 1,
+        },
+    )
+
+    driver = fields.String(
+        metadata={
+            "description": "Device driver",
+            "example": "NMEA",
+        },
+    )
+
+    subtype = fields.String(
+        metadata={
+            "description": "Device subtype",
+            "example": "AXN_2.31_3339_13101700,5632,PA6H,1.0",
+        },
+    )
+
+    mode = fields.String(
+        validate=lambda mode: mode in ["NO_FIX", "TWO_D", "THREE_D"],
+        metadata={
+            "description": "Device mode, one of NO_FIX, TWO_D, THREE_D",
+            "example": "3D",
+        },
+    )
+
+    @pre_load
+    def pre_load(self, data, **_):
+        from platypush.plugins.gps import DeviceMode
+
+        if data and data.get("mode"):
+            data["mode"] = DeviceMode(data["mode"]).value
+        return data
+
+
+class GpsStatusSchema(Schema):
+    """
+    Schema for the GPS status.
+    """
+
+    latitude = fields.Float(
+        metadata={
+            "description": "Latitude",
+            "example": 45.4642,
+        },
+    )
+
+    longitude = fields.Float(
+        metadata={
+            "description": "Longitude",
+            "example": 9.1900,
+        },
+    )
+
+    altitude = fields.Float(
+        metadata={
+            "description": "Altitude (in meters)",
+            "example": 100,
+        },
+    )
+
+    speed = fields.Float(
+        metadata={
+            "description": "Measured speed, if available (in km/h)",
+            "example": 10,
+        },
+    )
+
+    satellites_used = fields.Integer(
+        metadata={
+            "description": "Number of satellites used for the fix",
+            "example": 4,
+        },
+    )
+
+    timestamp = DateTime(
+        metadata={
+            "description": "Timestamp of the last GPS update",
+            "example": "2021-08-01T00:00:00",
+        },
+    )
+
+    devices = fields.Dict(
+        keys=fields.String(),
+        values=fields.Nested(GpsDeviceSchema),
+        metadata={
+            "description": "Available GPS devices",
+            "example": {
+                "/dev/ttyUSB0": {
+                    "path": "/dev/ttyUSB0",
+                    "activated": "2021-08-01T00:00:00",
+                    "native": False,
+                    "baudrate": 9600,
+                    "parity": "N",
+                    "stopbits": 1,
+                    "cycle": 1,
+                    "driver": "NMEA",
+                    "subtype": "AXN_2.31_3339_13101700,5632,PA6H,1.0",
+                    "mode": "3D",
+                }
+            },
+        },
+    )