]> www.average.org Git - loctrkd.git/blobdiff - gps303/gps303proto.py
make parse_message return UNKNOWN on parse crash
[loctrkd.git] / gps303 / gps303proto.py
index c19c7c1b300c78fe2ea511c1a7859f0c35c434e4..8036d65e92a1a2e789b16b68774817c7a303a13c 100755 (executable)
@@ -17,13 +17,14 @@ Forewarnings:
 from datetime import datetime, timezone
 from enum import Enum
 from inspect import isclass
-from struct import pack, unpack
+from struct import error, pack, unpack
 
 __all__ = (
     "class_by_prefix",
     "inline_response",
     "parse_message",
     "proto_by_name",
+    "DecodeError",
     "Respond",
     "GPS303Pkt",
     "UNKNOWN",
@@ -66,6 +67,12 @@ __all__ = (
 )
 
 
+class DecodeError(Exception):
+    def __init__(self, e, **kwargs):
+        super().__init__(e)
+        for k, v in kwargs.items():
+            setattr(self, k, v)
+
 def intx(x):
     if isinstance(x, str):
         x = int(x, 0)
@@ -150,15 +157,24 @@ class GPS303Pkt(metaclass=MetaPkt):
     OUT_KWARGS = ()
 
     def __init__(self, *args, **kwargs):
-        assert len(args) == 0
-        for kw, typ, dfl in self.KWARGS:
-            setattr(self, kw, typ(kwargs.pop(kw, dfl)))
-        if kwargs:
-            print("KWARGS", self.KWARGS)
-            print("kwargs", kwargs)
-            raise TypeError(
-                self.__class__.__name__ + " stray kwargs " + str(kwargs)
-            )
+        """
+        Construct the object _either_ from (length, payload),
+        _or_ from the values of individual fields
+        """
+        assert not args or (len(args) == 2 and not kwargs)
+        if args:  # guaranteed to be two arguments at this point
+            self.length, self.payload = args
+            try:
+                self.decode(self.length, self.payload)
+            except error as e:
+                raise DecodeError(e, obj=self)
+        else:
+            for kw, typ, dfl in self.KWARGS:
+                setattr(self, kw, typ(kwargs.pop(kw, dfl)))
+            if kwargs:
+                raise ValueError(
+                    self.__class__.__name__ + " stray kwargs " + str(kwargs)
+                )
 
     def __repr__(self):
         return "{}({})".format(
@@ -176,19 +192,21 @@ class GPS303Pkt(metaclass=MetaPkt):
         )
 
     def in_decode(self, length, packet):
+        # Overridden in subclasses, otherwise do not decode payload
         return
 
     def out_decode(self, length, packet):
-        raise NotImplementedError(
-            self.__class__.__name__ + ".decode() not implemented"
-        )
+        # Overridden in subclasses, otherwise do not decode payload
+        return
 
     def in_encode(self):
+        # Necessary to emulate terminal, which is not implemented
         raise NotImplementedError(
             self.__class__.__name__ + ".encode() not implemented"
         )
 
     def out_encode(self):
+        # Overridden in subclasses, otherwise make empty payload
         return b""
 
     @property
@@ -197,14 +215,6 @@ class GPS303Pkt(metaclass=MetaPkt):
         length = len(payload) + 1
         return pack("BB", length, self.PROTO) + payload
 
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = cls.In()
-        self.length = length
-        self.payload = payload
-        self.decode(length, payload)
-        return self
-
 
 class UNKNOWN(GPS303Pkt):
     PROTO = 256  # > 255 is impossible in real packets
@@ -245,8 +255,9 @@ class _GPS_POSITIONING(GPS303Pkt):
         if self.dtime == b"\0\0\0\0\0\0":
             self.devtime = None
         else:
+            yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
             self.devtime = datetime(
-                *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
+                2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
             )
         self.gps_data_length = payload[6] >> 4
         self.gps_nb_sat = payload[6] & 0x0F
@@ -296,7 +307,6 @@ class STATUS(GPS303Pkt):
 
 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
     PROTO = 0x14
-    RESPOND = Respond.INL
 
 
 class RESET(GPS303Pkt):
@@ -533,12 +543,19 @@ class RESTORE_PASSWORD(GPS303Pkt):
 class WIFI_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x69
     RESPOND = Respond.EXT
-    OUT_KWARGS = (("lat", float, None), ("lon", float, None))
+    OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
 
     def out_encode(self):
-        if self.lat is None or self.lon is None:
+        if self.latitude is None or self.longitude is None:
             return b""
-        return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
+        return "{:+#010.8g},{:+#010.8g}".format(
+            self.latitude, self.longitude
+        ).encode()
+
+    def out_decode(self, length, payload):
+        lat, lon = payload.decode().split(",")
+        self.latitude = float(lat)
+        self.longitude = float(lon)
 
 
 class MANUAL_POSITIONING(GPS303Pkt):
@@ -645,13 +662,24 @@ def inline_response(packet):
     return None
 
 
-def parse_message(packet):
+def parse_message(packet, is_incoming=True):
     """From a packet (without framing bytes) derive the XXX.In object"""
     length, proto = unpack("BB", packet[:2])
     payload = packet[2:]
-    if proto in CLASSES:
-        return CLASSES[proto].from_packet(length, payload)
+    if proto not in CLASSES:
+        cause = ValueError(f"Proto {proto} is unknown")
+    else:
+        try:
+            if is_incoming:
+                return CLASSES[proto].In(length, payload)
+            else:
+                return CLASSES[proto].Out(length, payload)
+        except DecodeError as e:
+            cause = e
+    if is_incoming:
+        retobj = UNKNOWN.In(length, payload)
     else:
-        retobj = UNKNOWN.from_packet(length, payload)
-        retobj.PROTO = proto  # Override class attr with object attr
-        return retobj
+        retobj = UNKNOWN.Out(length, payload)
+    retobj.PROTO = proto  # Override class attr with object attr
+    retobj.cause = cause
+    return retobj