]> www.average.org Git - loctrkd.git/blobdiff - gps303/gps303proto.py
cleanup of gps303proto
[loctrkd.git] / gps303 / gps303proto.py
index 3959dc82242cb0181ee0204f1a7ea88cc63d86f5..04f535057ae2d6b95d7a3f25c75de2309cd3a293 100755 (executable)
@@ -17,13 +17,11 @@ Forewarnings:
 from datetime import datetime, timezone
 from enum import Enum
 from inspect import isclass
-from logging import getLogger
 from struct import pack, unpack
 
 __all__ = (
     "class_by_prefix",
     "inline_response",
-    "make_object",
     "parse_message",
     "proto_by_name",
     "Respond",
@@ -40,7 +38,18 @@ __all__ = (
     "WHITELIST_TOTAL",
     "WIFI_OFFLINE_POSITIONING",
     "TIME",
+    "PROHIBIT_LBS",
+    "GPS_LBS_SWITCH_TIMES",
+    "REMOTE_MONITOR_PHONE",
+    "SOS_PHONE",
+    "DAD_PHONE",
     "MOM_PHONE",
+    "STOP_UPLOAD",
+    "GPS_OFF_PERIOD",
+    "DND_PERIOD",
+    "RESTART_SHUTDOWN",
+    "DEVICE",
+    "ALARM_CLOCK",
     "STOP_ALARM",
     "SETUP",
     "SYNCHRONOUS_WHITELIST",
@@ -52,9 +61,43 @@ __all__ = (
     "CHARGER_DISCONNECTED",
     "VIBRATION_RECEIVED",
     "POSITION_UPLOAD_INTERVAL",
+    "SOS_ALARM",
+    "UNKNOWN_B3",
 )
 
-log = getLogger("gps303")
+
+def intx(x):
+    if isinstance(x, str):
+        x = int(x, 0)
+    return x
+
+
+def hhmm(x):
+    """Check for the string that represents hours and minutes"""
+    if not isinstance(x, str) or len(x) != 4:
+        raise ValueError(str(x) + " is not a four-character string")
+    hh = int(x[:2])
+    mm = int(x[2:])
+    if hh < 0 or hh > 23 or mm < 0 or mm > 59:
+        raise ValueError(str(x) + " does not contain valid hours and minutes")
+    return x
+
+
+def l3str(x):
+    if isinstance(x, str):
+        x = x.split(",")
+    if len(x) != 3 or not all(isinstance(el, str) for el in x):
+        raise ValueError(str(x) + " is not a list of three strings")
+    return x
+
+
+def l3int(x):
+    if isinstance(x, str):
+        x = x.split(",")
+        x = [int(el) for el in x]
+    if len(x) != 3 or not all(isinstance(el, int) for el in x):
+        raise ValueError(str(x) + " is not a list of three integers")
+    return x
 
 
 class MetaPkt(type):
@@ -241,7 +284,7 @@ class STATUS(GPS303Pkt):
         return self
 
     def out_encode(self):  # Set interval in minutes
-        return cls.make_packet(pack("B", self.upload_interval))
+        return pack("B", self.upload_interval)
 
 
 class HIBERNATION(GPS303Pkt):  # Server can send to send devicee to sleep
@@ -361,7 +404,11 @@ class STOP_UPLOAD(GPS303Pkt):  # Server response to LOGIN to thwart the device
 
 class GPS_OFF_PERIOD(GPS303Pkt):
     PROTO = 0x46
-    OUT_KWARGS = (("onoff", int, 0), ("fm", str, "0000"), ("to", str, "2359"))
+    OUT_KWARGS = (
+        ("onoff", int, 0),
+        ("fm", hhmm, "0000"),
+        ("to", hhmm, "2359"),
+    )
 
     def out_encode(self):
         return (
@@ -376,10 +423,10 @@ class DND_PERIOD(GPS303Pkt):
     OUT_KWARGS = (
         ("onoff", int, 0),
         ("week", int, 3),
-        ("fm1", str, "0000"),
-        ("to1", str, "2359"),
-        ("fm2", str, "0000"),
-        ("to2", str, "2359"),
+        ("fm1", hhmm, "0000"),
+        ("to1", hhmm, "2359"),
+        ("fm2", hhmm, "0000"),
+        ("to2", hhmm, "2359"),
     )
 
     def out_endode(self):
@@ -395,7 +442,7 @@ class DND_PERIOD(GPS303Pkt):
 
 class RESTART_SHUTDOWN(GPS303Pkt):
     PROTO = 0x48
-    OUT_KWARGS = (("flag", int, 2),)
+    OUT_KWARGS = (("flag", int, 0),)
 
     def out_encode(self):
         # 1 - restart
@@ -437,16 +484,16 @@ class STOP_ALARM(GPS303Pkt):
 class SETUP(GPS303Pkt):
     PROTO = 0x57
     RESPOND = Respond.EXT
-    OUT_KWARGS = (  # TODO handle properly
-        ("uploadintervalseconds", int, 0x0300),
-        ("binaryswitch", int, 0b00110001),
-        ("alarms", lambda x: x, [0, 0, 0]),
+    OUT_KWARGS = (
+        ("uploadintervalseconds", intx, 0x0300),
+        ("binaryswitch", intx, 0b00110001),
+        ("alarms", l3int, [0, 0, 0]),
         ("dndtimeswitch", int, 0),
-        ("dndtimes", lambda x: x, [0, 0, 0]),
+        ("dndtimes", l3int, [0, 0, 0]),
         ("gpstimeswitch", int, 0),
         ("gpstimestart", int, 0),
         ("gpstimestop", int, 0),
-        ("phonenumbers", lambda x: x, ["", "", ""]),
+        ("phonenumbers", l3str, ["", "", ""]),
     )
 
     def out_encode(self):
@@ -545,6 +592,17 @@ class SOS_ALARM(GPS303Pkt):
     PROTO = 0x99
 
 
+class UNKNOWN_B3(GPS303Pkt):
+    PROTO = 0xb3
+    IN_KWARGS = (("asciidata", str, ""),)
+
+    @classmethod
+    def from_packet(cls, length, payload):
+        self = super().from_packet(length, payload)
+        self.asciidata = payload.decode()
+        return self
+
+
 # Build dicts protocol number -> class and class name -> protocol number
 CLASSES = {}
 PROTOS = {}
@@ -590,29 +648,13 @@ def inline_response(packet):
     return None
 
 
-def make_object(length, proto, payload):
+def parse_message(packet):
+    """ From a packet (without framing bytes) derive the XXX.In object """
+    length, proto = unpack("BB", packet[:2])
+    payload = packet[2:]
     if proto in CLASSES:
         return CLASSES[proto].from_packet(length, payload)
     else:
         retobj = UNKNOWN.from_packet(length, payload)
         retobj.PROTO = proto  # Override class attr with object attr
         return retobj
-
-
-def parse_message(packet):
-    length, proto = unpack("BB", packet[:2])
-    payload = packet[2:]
-    adjust = 2 if proto == STATUS.PROTO else 4  # Weird special case
-    if (
-        proto not in (WIFI_POSITIONING.PROTO, WIFI_OFFLINE_POSITIONING.PROTO)
-        and length > 1
-        and len(payload) + adjust != length
-    ):
-        log.warning(
-            "With proto %d length is %d but payload length is %d+%d",
-            proto,
-            length,
-            len(payload),
-            adjust,
-        )
-    return make_object(length, proto, payload)