import threading import time from platypush.backend import Backend from platypush.message.event.gps import GPSVersionEvent, GPSDeviceEvent, GPSUpdateEvent class GpsBackend(Backend): """ This backend can interact with a GPS device and listen for events. Once installed gpsd you need to run it and associate it to your device. Example if your GPS device communicates over USB and is available on /dev/ttyUSB0:: [sudo] gpsd /dev/ttyUSB0 -F /var/run/gpsd.sock The best option is probably to run gpsd at startup as a systemd service. """ _fail_sleep_time = 5.0 _lat_lng_tolerance = 1e-5 _alt_tolerance = 0.5 def __init__(self, gpsd_server='localhost', gpsd_port=2947, **kwargs): """ :param gpsd_server: gpsd daemon server name/address (default: localhost) :type gpsd_server: str :param gpsd_port: Port of the gpsd daemon (default: 2947) :type gpsd_port: int or str """ super().__init__(**kwargs) self.gpsd_server = gpsd_server self.gpsd_port = gpsd_port self._session = None self._session_lock = threading.RLock() self._devices = {} def _get_session(self): import gps with self._session_lock: if not self._session: self._session = gps.gps( host=self.gpsd_server, port=self.gpsd_port, reconnect=True ) self._session.stream(gps.WATCH_ENABLE | gps.WATCH_NEWSTYLE) return self._session def _gps_report_to_event(self, report): if report.get('class').lower() == 'version': return GPSVersionEvent( release=report.get('release'), rev=report.get('rev'), proto_major=report.get('proto_major'), proto_minor=report.get('proto_minor'), ) if report.get('class').lower() == 'devices': for device in report.get('devices', []): if device.get( 'path' ) not in self._devices or device != self._devices.get('path'): # noinspection DuplicatedCode self._devices[device.get('path')] = device return GPSDeviceEvent( path=device.get('path'), activated=device.get('activated'), native=device.get('native'), bps=device.get('bps'), parity=device.get('parity'), stopbits=device.get('stopbits'), cycle=device.get('cycle'), driver=device.get('driver'), ) if report.get('class').lower() == 'device': # noinspection DuplicatedCode self._devices[report.get('path')] = report return GPSDeviceEvent( path=report.get('path'), activated=report.get('activated'), native=report.get('native'), bps=report.get('bps'), parity=report.get('parity'), stopbits=report.get('stopbits'), cycle=report.get('cycle'), driver=report.get('driver'), ) if report.get('class').lower() == 'tpv': return GPSUpdateEvent( device=report.get('device'), latitude=report.get('lat'), longitude=report.get('lon'), altitude=report.get('alt'), mode=report.get('mode'), epv=report.get('epv'), eph=report.get('eph'), sep=report.get('sep'), ) def run(self): super().run() self.logger.info( 'Initialized GPS backend on {}:{}'.format(self.gpsd_server, self.gpsd_port) ) last_event = None while not self.should_stop(): try: session = self._get_session() report = session.next() event = self._gps_report_to_event(report) if event and ( last_event is None or abs( (last_event.args.get('latitude') or 0) - (event.args.get('latitude') or 0) ) >= self._lat_lng_tolerance or abs( (last_event.args.get('longitude') or 0) - (event.args.get('longitude') or 0) ) >= self._lat_lng_tolerance or abs( (last_event.args.get('altitude') or 0) - (event.args.get('altitude') or 0) ) >= self._alt_tolerance ): self.bus.post(event) last_event = event except Exception as e: if isinstance(e, StopIteration): self.logger.warning( 'GPS service connection lost, check that gpsd is running' ) else: self.logger.exception(e) self._session = None time.sleep(self._fail_sleep_time) # vim:sw=4:ts=4:et: