]> www.average.org Git - loctrkd.git/blobdiff - gps303/gps303proto.py
typing: make zmsg.py typecheck
[loctrkd.git] / gps303 / gps303proto.py
index db8fd014637279ca256b14d44466b3d7e5eb3a0f..b70cacf1eb328571171272657c92c97d0f04b2f1 100755 (executable)
@@ -17,15 +17,15 @@ Forewarnings:
 from datetime import datetime, timezone
 from enum import Enum
 from inspect import isclass
-from logging import getLogger
-from struct import pack, unpack
+from struct import error, pack, unpack
+from typing import Any, Callable, Tuple
 
 __all__ = (
     "class_by_prefix",
     "inline_response",
-    "make_object",
     "parse_message",
     "proto_by_name",
+    "DecodeError",
     "Respond",
     "GPS303Pkt",
     "UNKNOWN",
@@ -40,7 +40,18 @@ __all__ = (
     "WHITELIST_TOTAL",
     "WIFI_OFFLINE_POSITIONING",
     "TIME",
+    "PROHIBIT_LBS",
+    "GPS_LBS_SWITCH_TIMES",
+    "REMOTE_MONITOR_PHONE",
+    "SOS_PHONE",
+    "DAD_PHONE",
     "MOM_PHONE",
+    "STOP_UPLOAD",
+    "GPS_OFF_PERIOD",
+    "DND_PERIOD",
+    "RESTART_SHUTDOWN",
+    "DEVICE",
+    "ALARM_CLOCK",
     "STOP_ALARM",
     "SETUP",
     "SYNCHRONOUS_WHITELIST",
@@ -52,9 +63,49 @@ __all__ = (
     "CHARGER_DISCONNECTED",
     "VIBRATION_RECEIVED",
     "POSITION_UPLOAD_INTERVAL",
+    "SOS_ALARM",
+    "UNKNOWN_B3",
 )
 
-log = getLogger("gps303")
+
+class DecodeError(Exception):
+    def __init__(self, e, **kwargs):
+        super().__init__(e)
+        for k, v in kwargs.items():
+            setattr(self, k, v)
+
+def intx(x):
+    if isinstance(x, str):
+        x = int(x, 0)
+    return x
+
+
+def hhmm(x):
+    """Check for the string that represents hours and minutes"""
+    if not isinstance(x, str) or len(x) != 4:
+        raise ValueError(str(x) + " is not a four-character string")
+    hh = int(x[:2])
+    mm = int(x[2:])
+    if hh < 0 or hh > 23 or mm < 0 or mm > 59:
+        raise ValueError(str(x) + " does not contain valid hours and minutes")
+    return x
+
+
+def l3str(x):
+    if isinstance(x, str):
+        x = x.split(",")
+    if len(x) != 3 or not all(isinstance(el, str) for el in x):
+        raise ValueError(str(x) + " is not a list of three strings")
+    return x
+
+
+def l3int(x):
+    if isinstance(x, str):
+        x = x.split(",")
+        x = [int(el) for el in x]
+    if len(x) != 3 or not all(isinstance(el, int) for el in x):
+        raise ValueError(str(x) + " is not a list of three integers")
+    return x
 
 
 class MetaPkt(type):
@@ -71,23 +122,25 @@ class MetaPkt(type):
 
     def __new__(cls, name, bases, attrs):
         newcls = super().__new__(cls, name, bases, attrs)
-        nestattrs = {"encode": lambda self: self.in_encode()}
-        if "IN_KWARGS" in attrs:
-            nestattrs["KWARGS"] = attrs["IN_KWARGS"]
         newcls.In = super().__new__(
             cls,
             name + ".In",
             (newcls,) + bases,
-            nestattrs,
+            {
+                "KWARGS": newcls.IN_KWARGS,
+                "decode": newcls.in_decode,
+                "encode": newcls.in_encode,
+            },
         )
-        nestattrs = {"encode": lambda self: self.out_encode()}
-        if "OUT_KWARGS" in attrs:
-            nestattrs["KWARGS"] = attrs["OUT_KWARGS"]
         newcls.Out = super().__new__(
             cls,
             name + ".Out",
             (newcls,) + bases,
-            nestattrs,
+            {
+                "KWARGS": newcls.OUT_KWARGS,
+                "decode": newcls.out_decode,
+                "encode": newcls.out_encode,
+            },
         )
         return newcls
 
@@ -101,19 +154,28 @@ class Respond(Enum):
 class GPS303Pkt(metaclass=MetaPkt):
     RESPOND = Respond.NON  # Do not send anything back by default
     PROTO: int
-    # Have these kwargs for now, TODO redo
-    IN_KWARGS = (("length", int, 0), ("payload", bytes, b""))
+    IN_KWARGS: Tuple[Tuple[str, Callable, Any], ...] = ()
+    OUT_KWARGS: Tuple[Tuple[str, Callable, Any], ...] = ()
 
     def __init__(self, *args, **kwargs):
-        assert len(args) == 0
-        for kw, typ, dfl in self.KWARGS:
-            setattr(self, kw, typ(kwargs.pop(kw, dfl)))
-        if kwargs:
-            print("KWARGS", self.KWARGS)
-            print("kwargs", kwargs)
-            raise TypeError(
-                self.__class__.__name__ + " stray kwargs " + str(kwargs)
-            )
+        """
+        Construct the object _either_ from (length, payload),
+        _or_ from the values of individual fields
+        """
+        assert not args or (len(args) == 2 and not kwargs)
+        if args:  # guaranteed to be two arguments at this point
+            self.length, self.payload = args
+            try:
+                self.decode(self.length, self.payload)
+            except error as e:
+                raise DecodeError(e, obj=self)
+        else:
+            for kw, typ, dfl in self.KWARGS:
+                setattr(self, kw, typ(kwargs.pop(kw, dfl)))
+            if kwargs:
+                raise ValueError(
+                    self.__class__.__name__ + " stray kwargs " + str(kwargs)
+                )
 
     def __repr__(self):
         return "{}({})".format(
@@ -130,12 +192,22 @@ class GPS303Pkt(metaclass=MetaPkt):
             ),
         )
 
+    def in_decode(self, length, packet):
+        # Overridden in subclasses, otherwise do not decode payload
+        return
+
+    def out_decode(self, length, packet):
+        # Overridden in subclasses, otherwise do not decode payload
+        return
+
     def in_encode(self):
+        # Necessary to emulate terminal, which is not implemented
         raise NotImplementedError(
             self.__class__.__name__ + ".encode() not implemented"
         )
 
     def out_encode(self):
+        # Overridden in subclasses, otherwise make empty payload
         return b""
 
     @property
@@ -144,10 +216,6 @@ class GPS303Pkt(metaclass=MetaPkt):
         length = len(payload) + 1
         return pack("BB", length, self.PROTO) + payload
 
-    @classmethod
-    def from_packet(cls, length, payload):
-        return cls.In(payload=payload, length=length)
-
 
 class UNKNOWN(GPS303Pkt):
     PROTO = 256  # > 255 is impossible in real packets
@@ -158,9 +226,7 @@ class LOGIN(GPS303Pkt):
     RESPOND = Respond.INL
     # Default response for ACK, can also respond with STOP_UPLOAD
 
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = super().from_packet(length, payload)
+    def in_decode(self, length, payload):
         self.imei = payload[:-1].hex()
         self.ver = unpack("B", payload[-1:])[0]
         return self
@@ -185,15 +251,14 @@ class HEARTBEAT(GPS303Pkt):
 class _GPS_POSITIONING(GPS303Pkt):
     RESPOND = Respond.INL
 
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = super().from_packet(length, payload)
+    def in_decode(self, length, payload):
         self.dtime = payload[:6]
         if self.dtime == b"\0\0\0\0\0\0":
             self.devtime = None
         else:
+            yr, mo, da, hr, mi, se = unpack("BBBBBB", self.dtime)
             self.devtime = datetime(
-                *unpack("BBBBBB", self.dtime), tzinfo=timezone.utc
+                2000 + yr, mo, da, hr, mi, se, tzinfo=timezone.utc
             )
         self.gps_data_length = payload[6] >> 4
         self.gps_nb_sat = payload[6] & 0x0F
@@ -203,7 +268,7 @@ class _GPS_POSITIONING(GPS303Pkt):
         flip_lat = not bool(flags & 0b0000010000000000)  # bit 5
         self.heading = flags & 0b0000001111111111  # bits 6 - last
         self.latitude = lat / (30000 * 60) * (-1 if flip_lat else 1)
-        self.longitude = lon / (30000 * 60) * (-2 if flip_lon else 1)
+        self.longitude = lon / (30000 * 60) * (-1 if flip_lon else 1)
         self.speed = speed
         self.flags = flags
         return self
@@ -227,31 +292,22 @@ class STATUS(GPS303Pkt):
     RESPOND = Respond.EXT
     OUT_KWARGS = (("upload_interval", int, 25),)
 
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = super().from_packet(length, payload)
-        if len(payload) == 5:
-            (
-                self.batt,
-                self.ver,
-                self.timezone,
-                self.intvl,
-                self.signal,
-            ) = unpack("BBBBB", payload)
-        elif len(payload) == 4:
-            self.batt, self.ver, self.timezone, self.intvl = unpack(
-                "BBBB", payload
-            )
+    def in_decode(self, length, payload):
+        self.batt, self.ver, self.timezone, self.intvl = unpack(
+            "BBBB", payload[:4]
+        )
+        if len(payload) > 4:
+            self.signal = payload[4]
+        else:
             self.signal = None
         return self
 
     def out_encode(self):  # Set interval in minutes
-        return cls.make_packet(pack("B", self.upload_interval))
+        return pack("B", self.upload_interval)
 
 
 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
     PROTO = 0x14
-    RESPOND = Respond.INL
 
 
 class RESET(GPS303Pkt):
@@ -269,9 +325,7 @@ class WHITELIST_TOTAL(GPS303Pkt):  # Server sends to initiage sync (0x58)
 
 
 class _WIFI_POSITIONING(GPS303Pkt):
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = super().from_packet(length, payload)
+    def in_decode(self, length, payload):
         self.dtime = payload[:6]
         if self.dtime == b"\0\0\0\0\0\0":
             self.devtime = None
@@ -366,7 +420,11 @@ class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
 
 class GPS_OFF_PERIOD(GPS303Pkt):
     PROTO = 0x46
-    OUT_KWARGS = (("onoff", int, 0), ("fm", str, "0000"), ("to", str, "2359"))
+    OUT_KWARGS = (
+        ("onoff", int, 0),
+        ("fm", hhmm, "0000"),
+        ("to", hhmm, "2359"),
+    )
 
     def out_encode(self):
         return (
@@ -381,10 +439,10 @@ class DND_PERIOD(GPS303Pkt):
     OUT_KWARGS = (
         ("onoff", int, 0),
         ("week", int, 3),
-        ("fm1", str, "0000"),
-        ("to1", str, "2359"),
-        ("fm2", str, "0000"),
-        ("to2", str, "2359"),
+        ("fm1", hhmm, "0000"),
+        ("to1", hhmm, "2359"),
+        ("fm2", hhmm, "0000"),
+        ("to2", hhmm, "2359"),
     )
 
     def out_endode(self):
@@ -400,7 +458,7 @@ class DND_PERIOD(GPS303Pkt):
 
 class RESTART_SHUTDOWN(GPS303Pkt):
     PROTO = 0x48
-    OUT_KWARGS = (("flag", int, 2),)
+    OUT_KWARGS = (("flag", int, 0),)
 
     def out_encode(self):
         # 1 - restart
@@ -432,9 +490,7 @@ class ALARM_CLOCK(GPS303Pkt):
 class STOP_ALARM(GPS303Pkt):
     PROTO = 0x56
 
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = super().from_packet(length, payload)
+    def in_decode(self, length, payload):
         self.flag = payload[0]
         return self
 
@@ -442,16 +498,16 @@ class STOP_ALARM(GPS303Pkt):
 class SETUP(GPS303Pkt):
     PROTO = 0x57
     RESPOND = Respond.EXT
-    OUT_KWARGS = (  # TODO handle properly
-        ("uploadintervalseconds", int, 0x0300),
-        ("binaryswitch", int, 0b00110001),
-        ("alarms", int, [0, 0, 0]),
+    OUT_KWARGS = (
+        ("uploadintervalseconds", intx, 0x0300),
+        ("binaryswitch", intx, 0b00110001),
+        ("alarms", l3int, [0, 0, 0]),
         ("dndtimeswitch", int, 0),
-        ("dndtimes", int, [0, 0, 0]),
+        ("dndtimes", l3int, [0, 0, 0]),
         ("gpstimeswitch", int, 0),
         ("gpstimestart", int, 0),
         ("gpstimestop", int, 0),
-        ("phonenumbers", int, ["", "", ""]),
+        ("phonenumbers", l3str, ["", "", ""]),
     )
 
     def out_encode(self):
@@ -488,20 +544,25 @@ class RESTORE_PASSWORD(GPS303Pkt):
 class WIFI_POSITIONING(_WIFI_POSITIONING):
     PROTO = 0x69
     RESPOND = Respond.EXT
-    OUT_KWARGS = (("lat", float, None), ("lon", float, None))
+    OUT_KWARGS = (("latitude", float, None), ("longitude", float, None))
 
     def out_encode(self):
-        if self.lat is None or self.lon is None:
+        if self.latitude is None or self.longitude is None:
             return b""
-        return "{:+#010.8g},{:+#010.8g}".format(self.lat, self.lon).encode()
+        return "{:+#010.8g},{:+#010.8g}".format(
+            self.latitude, self.longitude
+        ).encode()
+
+    def out_decode(self, length, payload):
+        lat, lon = payload.decode().split(",")
+        self.latitude = float(lat)
+        self.longitude = float(lon)
 
 
 class MANUAL_POSITIONING(GPS303Pkt):
     PROTO = 0x80
 
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = super().from_packet(length, payload)
+    def in_decode(self, length, payload):
         self.flag = payload[0] if len(payload) > 0 else None
         self.reason = {
             1: "Incorrect time",
@@ -536,9 +597,7 @@ class POSITION_UPLOAD_INTERVAL(GPS303Pkt):
     RESPOND = Respond.EXT
     OUT_KWARGS = (("interval", int, 10),)
 
-    @classmethod
-    def from_packet(cls, length, payload):
-        self = super().from_packet(length, payload)
+    def in_decode(self, length, payload):
         self.interval = unpack("!H", payload[:2])
         return self
 
@@ -550,6 +609,15 @@ class SOS_ALARM(GPS303Pkt):
     PROTO = 0x99
 
 
+class UNKNOWN_B3(GPS303Pkt):
+    PROTO = 0xB3
+    IN_KWARGS = (("asciidata", str, ""),)
+
+    def in_decode(self, length, payload):
+        self.asciidata = payload.decode()
+        return self
+
+
 # Build dicts protocol number -> class and class name -> protocol number
 CLASSES = {}
 PROTOS = {}
@@ -595,29 +663,24 @@ def inline_response(packet):
     return None
 
 
-def make_object(length, proto, payload):
-    if proto in CLASSES:
-        return CLASSES[proto].from_packet(length, payload)
-    else:
-        retobj = UNKNOWN.from_packet(length, payload)
-        retobj.PROTO = proto  # Override class attr with object attr
-        return retobj
-
-
-def parse_message(packet):
+def parse_message(packet, is_incoming=True):
+    """From a packet (without framing bytes) derive the XXX.In object"""
     length, proto = unpack("BB", packet[:2])
     payload = packet[2:]
-    adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
-    if (
-        proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
-        and length > 1
-        and len(payload) + adjust != length
-    ):
-        log.warning(
-            "With proto %d length is %d but payload length is %d+%d",
-            proto,
-            length,
-            len(payload),
-            adjust,
-        )
-    return make_object(length, proto, payload)
+    if proto not in CLASSES:
+        cause = ValueError(f"Proto {proto} is unknown")
+    else:
+        try:
+            if is_incoming:
+                return CLASSES[proto].In(length, payload)
+            else:
+                return CLASSES[proto].Out(length, payload)
+        except DecodeError as e:
+            cause = e
+    if is_incoming:
+        retobj = UNKNOWN.In(length, payload)
+    else:
+        retobj = UNKNOWN.Out(length, payload)
+    retobj.PROTO = proto  # Override class attr with object attr
+    retobj.cause = cause
+    return retobj