X-Git-Url: http://www.average.org/gitweb/?a=blobdiff_plain;f=gps303%2Fgps303proto.py;h=8036d65e92a1a2e789b16b68774817c7a303a13c;hb=027572f7b467169db2278cdedfd6955458f7be2e;hp=136f86ba3df3e5a47044bfac09536cd691a1f45e;hpb=2a7ebf5ab639f4f3a30d5b737f292ac0958d7d1b;p=loctrkd.git diff --git a/gps303/gps303proto.py b/gps303/gps303proto.py index 136f86b..8036d65 100755 --- a/gps303/gps303proto.py +++ b/gps303/gps303proto.py @@ -17,13 +17,14 @@ Forewarnings: from datetime import datetime, timezone from enum import Enum from inspect import isclass -from struct import pack, unpack +from struct import error, pack, unpack __all__ = ( "class_by_prefix", "inline_response", "parse_message", "proto_by_name", + "DecodeError", "Respond", "GPS303Pkt", "UNKNOWN", @@ -66,6 +67,12 @@ __all__ = ( ) +class DecodeError(Exception): + def __init__(self, e, **kwargs): + super().__init__(e) + for k, v in kwargs.items(): + setattr(self, k, v) + def intx(x): if isinstance(x, str): x = int(x, 0) @@ -157,7 +164,10 @@ class GPS303Pkt(metaclass=MetaPkt): assert not args or (len(args) == 2 and not kwargs) if args: # guaranteed to be two arguments at this point self.length, self.payload = args - self.decode(self.length, self.payload) + try: + self.decode(self.length, self.payload) + except error as e: + raise DecodeError(e, obj=self) else: for kw, typ, dfl in self.KWARGS: setattr(self, kw, typ(kwargs.pop(kw, dfl))) @@ -186,10 +196,8 @@ class GPS303Pkt(metaclass=MetaPkt): return def out_decode(self, length, packet): - # Necessary to emulate terminal, which is not implemented - raise NotImplementedError( - self.__class__.__name__ + ".decode() not implemented" - ) + # Overridden in subclasses, otherwise do not decode payload + return def in_encode(self): # Necessary to emulate terminal, which is not implemented @@ -299,7 +307,6 @@ class STATUS(GPS303Pkt): class HIBERNATION(GPS303Pkt): # Server can send to send devicee to sleep PROTO = 0x14 - RESPOND = Respond.INL class RESET(GPS303Pkt): @@ -536,12 +543,19 @@ class RESTORE_PASSWORD(GPS303Pkt): class WIFI_POSITIONING(_WIFI_POSITIONING): PROTO = 0x69 RESPOND = Respond.EXT - OUT_KWARGS = (("lat", float, None), ("lon", float, None)) + OUT_KWARGS = (("latitude", float, None), ("longitude", float, None)) def out_encode(self): - if self.lat is None or self.lon is None: + if self.latitude is None or self.longitude is None: return b"" - return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode() + return "{:+#010.8g},{:+#010.8g}".format( + self.latitude, self.longitude + ).encode() + + def out_decode(self, length, payload): + lat, lon = payload.decode().split(",") + self.latitude = float(lat) + self.longitude = float(lon) class MANUAL_POSITIONING(GPS303Pkt): @@ -648,13 +662,24 @@ def inline_response(packet): return None -def parse_message(packet): +def parse_message(packet, is_incoming=True): """From a packet (without framing bytes) derive the XXX.In object""" length, proto = unpack("BB", packet[:2]) payload = packet[2:] - if proto in CLASSES: - return CLASSES[proto].In(length, payload) + if proto not in CLASSES: + cause = ValueError(f"Proto {proto} is unknown") else: + try: + if is_incoming: + return CLASSES[proto].In(length, payload) + else: + return CLASSES[proto].Out(length, payload) + except DecodeError as e: + cause = e + if is_incoming: retobj = UNKNOWN.In(length, payload) - retobj.PROTO = proto # Override class attr with object attr - return retobj + else: + retobj = UNKNOWN.Out(length, payload) + retobj.PROTO = proto # Override class attr with object attr + retobj.cause = cause + return retobj