]> www.average.org Git - loctrkd.git/blobdiff - gps303/gps303proto.py
typing: make zmsg.py typecheck
[loctrkd.git] / gps303 / gps303proto.py
index e3e7cf18cd634f2ab1261f26dcbb3643e7798738..b70cacf1eb328571171272657c92c97d0f04b2f1 100755 (executable)
@@ -17,13 +17,15 @@ Forewarnings:
 from datetime import datetime, timezone
 from enum import Enum
 from inspect import isclass
-from struct import pack, unpack
+from struct import error, pack, unpack
+from typing import Any, Callable, Tuple
 
 __all__ = (
     "class_by_prefix",
     "inline_response",
     "parse_message",
     "proto_by_name",
+    "DecodeError",
     "Respond",
     "GPS303Pkt",
     "UNKNOWN",
@@ -66,6 +68,12 @@ __all__ = (
 )
 
 
+class DecodeError(Exception):
+    def __init__(self, e, **kwargs):
+        super().__init__(e)
+        for k, v in kwargs.items():
+            setattr(self, k, v)
+
 def intx(x):
     if isinstance(x, str):
         x = int(x, 0)
@@ -146,8 +154,8 @@ class Respond(Enum):
 class GPS303Pkt(metaclass=MetaPkt):
     RESPOND = Respond.NON  # Do not send anything back by default
     PROTO: int
-    IN_KWARGS = ()
-    OUT_KWARGS = ()
+    IN_KWARGS: Tuple[Tuple[str, Callable, Any], ...] = ()
+    OUT_KWARGS: Tuple[Tuple[str, Callable, Any], ...] = ()
 
     def __init__(self, *args, **kwargs):
         """
@@ -157,7 +165,10 @@ class GPS303Pkt(metaclass=MetaPkt):
         assert not args or (len(args) == 2 and not kwargs)
         if args:  # guaranteed to be two arguments at this point
             self.length, self.payload = args
-            self.decode(self.length, self.payload)
+            try:
+                self.decode(self.length, self.payload)
+            except error as e:
+                raise DecodeError(e, obj=self)
         else:
             for kw, typ, dfl in self.KWARGS:
                 setattr(self, kw, typ(kwargs.pop(kw, dfl)))
@@ -656,12 +667,20 @@ def parse_message(packet, is_incoming=True):
     """From a packet (without framing bytes) derive the XXX.In object"""
     length, proto = unpack("BB", packet[:2])
     payload = packet[2:]
-    if proto in CLASSES:
-        if is_incoming:
-            return CLASSES[proto].In(length, payload)
-        else:
-            return CLASSES[proto].Out(length, payload)
+    if proto not in CLASSES:
+        cause = ValueError(f"Proto {proto} is unknown")
     else:
+        try:
+            if is_incoming:
+                return CLASSES[proto].In(length, payload)
+            else:
+                return CLASSES[proto].Out(length, payload)
+        except DecodeError as e:
+            cause = e
+    if is_incoming:
         retobj = UNKNOWN.In(length, payload)
-        retobj.PROTO = proto  # Override class attr with object attr
-        return retobj
+    else:
+        retobj = UNKNOWN.Out(length, payload)
+    retobj.PROTO = proto  # Override class attr with object attr
+    retobj.cause = cause
+    return retobj